[Demo 3] Sync Data from MySQL to Hive Using SeaTunnel
We aim to create a platform for sharing and learning.
We are launching a Demo demonstration plan in the Apache SeaTunnel community to showcase how to use connectors. We invite all data synchronization enthusiasts to share their knowledge and practical experience!
Our third session is themed on How to synchronize from MySQL to Hive using SeaTunnel.
Highlight! Highlight! If you’re a SeaTunnel user and want to see a demo of a specific synchronization scenario, please scroll to the bottom and leave a comment. We will prioritize recording demos of the most requested synchronization scenarios!
Demo Plan Goals
We aim to create a platform for sharing and learning. Through specific demo presentations and documentation, we hope to help community members better understand and apply various data connectors. These demos help beginners learn quickly and give seasoned experts a stage to showcase innovative solutions.
Previous Demo Tutorials:
Session 1: 【Demo 1】Syncing Data From MySQL to Doris Using SeaTunnel
Session 2: [Demo 2] Sync Data From MySQL CDC to Doris by Apache SeaTunnel
Description
Write data to Hive.
Tips
To use this connector, ensure your spark/flink cluster is already integrated with Hive. The tested hive version is 2.3.9.
If you use SeaTunnel Engine, You need to put seatunnel-hadoop3-3.1.4-uber.jar
and hive-exec-3.1.3.jar and libfb303-0.9.3.jar
in $SEATUNNEL_HOME/lib/ dir
.
Key features
exactly-once
By default, we use 2PC commit to ensure exactly-once
file format
text
csv
parquet
orc
json
compress codec
lzo
Options
table_name [string]
Target Hive table name eg: db1.table1, and if the source is multiple modes, you can use ${database_name}.${table_name}
to generate the table name, it will replace the ${database_name}
and ${table_name}
with the value of the CatalogTable generated from the source.
metastore_uri [string]
Hive metastore uri
hdfs_site_path [string]
The path of hdfs-site.xml
, used to load the configuration of namenodes
hive_site_path [string]
The path of hive-site.xml
hive.hadoop.conf [map]
Properties in hadoop conf(‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’)
hive.hadoop.conf-path [string]
The specified loading path for the ‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’ files
krb5_path [string]
The path of krb5.conf
, used to authenticate Kerberos
The path of hive-site.xml
, used to authenticate hive metastore
kerberos_principal [string]
The principal of Kerberos
kerberos_keytab_path [string]
The keytab path of kerberos
abort_drop_partition_metadata [list]
Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process).
common options
Sink plugin common parameters, please refer to Sink Common Options for details
Example
Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://namenode001:9083"
}
Example 1
We have a source table like this:
create table test_hive_source(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE,
test_array ARRAY<INT>,
test_map MAP<STRING, FLOAT>,
test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
We need to read data from the source table and write to another table:
create table test_hive_sink_text_simple(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);
The job config file can be like this:
env {
parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
table_name = "test_hive.test_hive_source"
metastore_uri = "thrift://ctyun7:9083"
}
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
hive.hadoop.conf = {
bucket = "s3a://mybucket"
}
}
Hive on s3
Step 1
Create the lib dir for the hive of emr.
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
Step 2
Get the jars from the maven center to the lib.
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
Step 3
Copy the jars from your environment on emr to the lib dir.
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
Step 4
Run the case.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}
sink {
Hive {
table_name = "test_hive.test_hive_sink_on_s3"
metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
hive.hadoop.conf = {
bucket="s3://ws-package"
}
}
}
Hive on oss
Step 1
Create the lib dir for hive of emr.
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
Step 2
Get the jars from the maven center to the lib.
cd ${SEATUNNEL_HOME}/plugins/Hive/lib
wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
Step 3
Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
Step 4
Run the case.
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}
sink {
Hive {
table_name = "test_hive.test_hive_sink_on_oss"
metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
hive.hadoop.conf-path = "/tmp/hadoop"
hive.hadoop.conf = {
bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
}
}
}
Example 2
We have multiple source tables like this:
create table test_1(
)
PARTITIONED BY (xx);
create table test_2(
)
PARTITIONED BY (xx);
...
We need to read data from these source tables and write to other tables:
The job config file can be like this:
env {
# You can set flink configuration here
parallelism = 3
job.name="test_hive_source_to_hive"
}
source {
Hive {
tables_configs = [
{
table_name = "test_hive.test_1"
metastore_uri = "thrift://ctyun6:9083"
},
{
table_name = "test_hive.test_2"
metastore_uri = "thrift://ctyun7:9083"
}
]
}
}
sink {
# choose stdout output plugin to output data to console
Hive {
table_name = "${database_name}.${table_name}"
metastore_uri = "thrift://ctyun7:9083"
}
}
About Apache SeaTunnel
Apache SeaTunnel is an easy-to-use, ultra-high-performance distributed data integration platform that supports real-time synchronization of massive amounts of data and can synchronize hundreds of billions of data per day stably and efficiently.
.
Welcome to fill out this form to be a speaker of Apache SeaTunnel: https://forms.gle/vtpQS6ZuxqXMt6DT6 :)
Why do we need Apache SeaTunnel?
Apache SeaTunnel does everything it can to solve the problems you may encounter in synchronizing massive amounts of data.
Data loss and duplication
Task buildup and latency
Low throughput
Long application-to-production cycle time
Lack of application status monitoring
Apache SeaTunnel Usage Scenarios
Massive data synchronization
Massive data integration
ETL of large volumes of data
Massive data aggregation
Multi-source data processing
Features of Apache SeaTunnel
Rich components
High scalability
Easy to use
Mature and stable
How to get started with Apache SeaTunnel quickly?
Want to experience Apache SeaTunnel quickly? SeaTunnel 2.1.0 takes 10 seconds to get you up and running.
https://seatunnel.apache.org/docs/2.1.0/developement/setup
How can I contribute?
We invite all partners who are interested in making local open-source global to join the Apache SeaTunnel contributors family and foster open-source together!
Submit an issue:
https://github.com/apache/seatunnel/issues
Contribute code to:
https://github.com/apache/seatunnel/pulls
Subscribe to the community development mailing list :
dev-subscribe@seatunnel.apache.org
Development Mailing List :
dev@seatunnel.apache.org
Join Slack:
https://join.slack.com/t/apacheseatunnel/shared_invite/zt-1kcxzyrxz-lKcF3BAyzHEmpcc4OSaCjQ
Follow Twitter:
https://twitter.com/ASFSeaTunnel
Join us now!❤️❤️