久久国产精品久久国产品这里,亚洲а∨天堂久久,国产真实露脸乱子伦原著,亚洲 日韩 国产 中文有码

    <i id="s5xwn"></i>

  1. <ul id="s5xwn"></ul>

  2. 您的位置:首頁 >資訊 > 正文

    環(huán)球速遞!大數(shù)據(jù)NiFi(十八):離線同步MySQL數(shù)據(jù)到HDFS

    來源:騰訊云2023-02-21 20:16:57

    ?離線同步MySQL數(shù)據(jù)到HDFS

    案例:使用NiFi將MySQL中數(shù)據(jù)導(dǎo)入到HDFS中。


    【資料圖】

    以上案例用到的處理器有“QueryDatabaseTable”、“ConvertAvroToJSON”、“SplitJson”、“PutHDFS”四個處理器。

    一、配置“QueryDatabaseTable”處理器

    該處理器主要使用提供的SQL語句或者生成SQL語句來查詢MySQL中的數(shù)據(jù),查詢結(jié)果轉(zhuǎn)換成Avro格式。該處理器只能運行在主節(jié)點上。

    關(guān)于“QueryDatabaseTable”處理器的“Properties”配置的說明如下:

    配置項

    默認值

    允許值

    描述

    Database Connection Pooling Service(數(shù)據(jù)庫連接池服務(wù))

    用于獲得與數(shù)據(jù)庫的連接的Controller Service。

    Database Type(數(shù)據(jù)庫類型)

    Generic

    選擇數(shù)據(jù)庫類型。Generic 通用類型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL

    Table Name(表名)

    查詢數(shù)據(jù)庫的表名,當使用“Custom Query”時,此為查詢結(jié)果的別名,并作為FlowFile中的屬性。

    Columns to Return(返回的列)

    查詢返回的列,多個列使用逗號分隔。如果列中有特殊名稱需要加引號,則所有列都需要加引號處理。

    Additional WHERE clause(where條件)

    在構(gòu)建SQL查詢時添加到WHERE條件中的自定義子句。

    Custom Query(自定義SQL查詢)

    自定義的SQL語句。該查詢被構(gòu)建成子查詢,設(shè)置后不會從其他屬性構(gòu)建SQL查詢。自定義SQL不支持Order by查詢。

    Maximum-value Columns(最大值列)

    指定增量查詢獲取最大值的列,多列使用逗號分開。指定后,這個處理器只能檢索到添加/更新的行。不能設(shè)置無法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來查詢?nèi)繑?shù)據(jù),這會對性能產(chǎn)生影響。

    Max Wait Time(最大超時時間)

    0 seconds

    SQL查詢最大時長,默認為0沒有限制,設(shè)置小于0的時間默認為0。

    Fetch Size(拉取數(shù)據(jù)量)

    0

    每次從查詢結(jié)果中拉取的數(shù)據(jù)量。

    Max Rows Per Flow File(每個FlowFile行數(shù))

    0

    在一個FlowFile文件中的數(shù)據(jù)行數(shù)。通過這個參數(shù)可以將很大的結(jié)果集分到多個FlowFile中。默認設(shè)置為0,所有結(jié)果存入一個FlowFile。

    Output Batch Size(數(shù)據(jù)輸出批次量)

    0

    輸出的FlowFile批次數(shù)據(jù)大小,當設(shè)置為0代表所有數(shù)據(jù)輸出到下游關(guān)系。如果數(shù)據(jù)量很大,則有可能下游很久沒有收到數(shù)據(jù),如果設(shè)置了,則每次達到該數(shù)據(jù)量就釋放數(shù)據(jù),傳輸?shù)较掠巍?/p>

    Maximum Number of Fragments(最大片段數(shù))

    0

    設(shè)置返回的最大數(shù)據(jù)片段數(shù),設(shè)置0默認將所有數(shù)據(jù)片段返回,如果表非常大,設(shè)置后可以防止OOM錯誤。

    Normalize Table/Column Names(標準表/列名)

    false

    truefalse

    是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號和句點將被更改為下劃線,以構(gòu)建有效的Avro記錄。

    Transaction Isolation Level

    設(shè)置事務(wù)隔離級別。

    Use Avro Logical Types(使用Avro邏輯類型)

    false

    truefalse

    是否對DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類型。

    Default Decimal Precision(Decimal數(shù)據(jù)類型位數(shù))

    10

    當 DECIMAL/NUMBER 數(shù)據(jù)類型轉(zhuǎn)換成Avro類型數(shù)據(jù)時,指定的數(shù)據(jù)位數(shù)。

    Default Decimal Scale(Decimal 數(shù)據(jù)類型小數(shù)位數(shù))

    0

    當 DECIMAL/NUMBER 數(shù)據(jù)類型轉(zhuǎn)換成Avro類型數(shù)據(jù)時,指定的小數(shù)點后的位數(shù)。

    Generic 通用類型OracleOracle 12+MS SQL 2012+MS SQL 2008MySQLPostgreSQL

    Table Name(表名)查詢數(shù)據(jù)庫的表名,當使用“Custom Query”時,此為查詢結(jié)果的別名,并作為FlowFile中的屬性。 Columns to Return (返回的列) 查詢返回的列,多個列使用逗號分隔。如果列中有特殊名稱需要加引號,則所有列都需要加引號處理。 Additional WHERE clause (where條件) 在構(gòu)建SQL查詢時添加到WHERE條件中的自定義子句。 Custom Query (自定義SQL查詢) 自定義的SQL語句。該查詢被構(gòu)建成子查詢,設(shè)置后不會從其他屬性構(gòu)建SQL查詢。自定義SQL不支持Order by查詢。 Maximum-value Columns (最大值列) 指定增量查詢獲取最大值的列,多列使用逗號分開。指定后,這個處理器只能檢索到添加/更新的行。不能設(shè)置無法比較大小的列,例如:boolean/bit。如果不指定,則參照表中所有的列來查詢?nèi)繑?shù)據(jù),這會對性能產(chǎn)生影響。 Max Wait Time(最大超時時間)0 seconds SQL查詢最大時長,默認為0沒有限制,設(shè)置小于0的時間默認為0。 Fetch Size(拉取數(shù)據(jù)量)0 每次從查詢結(jié)果中拉取的數(shù)據(jù)量。 Max Rows Per Flow File(每個FlowFile行數(shù))0 在一個FlowFile文件中的數(shù)據(jù)行數(shù)。通過這個參數(shù)可以將很大的結(jié)果集分到多個FlowFile中。默認設(shè)置為0,所有結(jié)果存入一個FlowFile。 Output Batch Size(數(shù)據(jù)輸出批次量)0 輸出的FlowFile批次數(shù)據(jù)大小,當設(shè)置為0代表所有數(shù)據(jù)輸出到下游關(guān)系。如果數(shù)據(jù)量很大,則有可能下游很久沒有收到數(shù)據(jù),如果設(shè)置了,則每次達到該數(shù)據(jù)量就釋放數(shù)據(jù),傳輸?shù)较掠巍? Maximum Number of Fragments(最大片段數(shù))0 設(shè)置返回的最大數(shù)據(jù)片段數(shù),設(shè)置0默認將所有數(shù)據(jù)片段返回,如果表非常大,設(shè)置后可以防止OOM錯誤。 Normalize Table/Column Names(標準表/列名)false true false 是否將列名中不兼容avro的字符修改為兼容avro的字符。例如,冒號和句點將被更改為下劃線,以構(gòu)建有效的Avro記錄。 Transaction Isolation Level 設(shè)置事務(wù)隔離級別。 Use Avro Logical Types(使用Avro邏輯類型)false true false 是否對DECIMAL/NUMBER, DATE, TIME 和 TIMESTAMP 列使用Avro邏輯類型。 Default Decimal Precision(Decimal數(shù)據(jù)類型位數(shù))10 當 DECIMAL/NUMBER 數(shù)據(jù)類型轉(zhuǎn)換成Avro類型數(shù)據(jù)時,指定的數(shù)據(jù)位數(shù)。 Default Decimal Scale(Decimal 數(shù)據(jù)類型小數(shù)位數(shù))0 當 DECIMAL/NUMBER 數(shù)據(jù)類型轉(zhuǎn)換成Avro類型數(shù)據(jù)時,指定的小數(shù)點后的位數(shù)。

    配置步驟如下:

    1、新建“QueryDatabaseTable”處理器

    2、配置“SCHEDULING”調(diào)度時間

    這里調(diào)度時間配置為99999s,讀取數(shù)據(jù)庫,這里讀取一次即可,默認0會不間斷讀取數(shù)據(jù)庫會對服務(wù)器造成非常大壓力。執(zhí)行僅支持“Primary”主節(jié)點運行。

    3、配置“PROPERTIES”

    配置“Database Connection Pooling Service”選擇創(chuàng)建,在彈出頁面中可以按照默認選擇直接點擊“Create”。

    點擊“->”繼續(xù)配置MySQL連接:

    在彈出的頁面中填入:

    連接MysqlURL:

    jdbc:mysql://192.168.179.5:3306/mynifi?characterEncoding=UTF-8&useSSL=false

    MySQL驅(qū)動類:com.mysql.jdbc.DriverMySQL jar包路徑:需要提前在NiFI集群各個節(jié)點上創(chuàng)建對應(yīng)目錄并上傳jar包。連接mysql的用戶名和密碼。

    通過以上配置好連接mysql如下:

    配置其他屬性如下:

    二、???????配置“ConvertAvroToJSON”處理器

    此處理器是將二進制Avro記錄轉(zhuǎn)換為JSON對象,提供了一個從Avro字段到JSON字段的直接映射,這樣得到的JSON將具有與Avro文檔相同的層次結(jié)構(gòu)。輸出的JSON編碼為UTF-8編碼,如果傳入的FlowFile包含多個Avro記錄,則轉(zhuǎn)換后的FlowFile是一個含有所有Avro記錄的JSON數(shù)組或一個JSON對象序列(每個Json對象單獨成行)。如果傳入的FlowFile不包含任何記錄,則輸出一個空JSON對象。

    關(guān)于“ConvertAvroToJSON”處理器的“Properties”配置的說明如下:

    配置項

    默認值

    允許值

    描述

    JSON container options(Json選擇)

    array

    nonearray

    如何解析Json對象,none:解析Json將每個Json對象寫入新行。array:解析到的json存入JsonArray一個對象

    Wrap Single Record(數(shù)據(jù)庫類型)

    false

    truefalse

    指定解析到的空記錄或者單條記錄是否按照“JSON container options”配置包裝對象。

    Avro schema(表名)

    如果Avro數(shù)據(jù)沒有Schema信息,需要配置。

    配置步驟如下:

    1、創(chuàng)建“ConvertAvroToJSON”處理器

    2、配置“PROPERTIES”

    3、連接“QueryDatabaseTable”處理器和“CovertAvroToJSON”處理器

    連接好兩個處理器后,可以配置“Connection”為負載均衡方式傳遞數(shù)據(jù):

    三、???????配置“SplitJson”處理器

    該處理器使用JsonPath表達式指定需要的Json數(shù)組元素,將Json數(shù)組中的多個Json對象切分出來,形成多個FlowFile。每個生成的FlowFile都由指定數(shù)組中的一個元素組成,并傳輸?shù)疥P(guān)系"split",原始文件傳輸?shù)疥P(guān)系"original"。如果沒有找到指定的JsonPath,或者沒有對數(shù)組元素求值,則將原始文件路由到"failure",不會生成任何文件。

    關(guān)于“SplitJson”處理器的“Properties”配置的說明如下:

    配置項

    默認值

    允許值

    描述

    JsonPath Expression(Json表達式)

    一個JsonPath表達式,它指定用以分割的數(shù)組元素。

    Null Value Representation(Null值表示)

    empty string

    empty stringthe string "null"

    指定結(jié)果為空值時的表示形式。

    配置步驟如下:

    1、創(chuàng)建“SplitJson”處理器

    2、配置“PROPERTIES”

    3、連接“ConvertAvroToJSON”處理器和“SplitJson”處理器

    連接后,連接關(guān)系選擇“success”:

    同時配置“ConverAvroToJSON”處理失敗的數(shù)據(jù)自動終止:

    四、配置“PutHDFS”處理器

    該處理器是將FlowFile數(shù)據(jù)寫入到HDFS分布式文件系統(tǒng)中。關(guān)于“PutHDFS”處理器的“Properties”主要配置的說明如下:

    配置項

    默認值

    允許值

    描述

    Hadoop Configuration Resources(Hadoop配置)

    nonearray

    HDFS配置文件,一個文件或者由逗號分隔的多個文件。不配置將在ClassPath中尋找‘core-site.xml’或者‘hdfs-site.xml’文件。

    Directory(目錄)

    需要寫入文件的HDFS父目錄。如果目錄不存在,將創(chuàng)建該目錄。

    Conflict Resolution Strategy(沖突解決)

    fail

    replaceignorefailappend

    指示當輸出目錄中已經(jīng)存在同名文件時如何處理。

    配置步驟如下:

    1、創(chuàng)建“PutHDFS”處理器

    2、配置“PROPERTIES”

    注意:以上需要在各個NiFi集群節(jié)點上創(chuàng)建“/root/test”目錄,并且在該目錄下上傳hdfs-site.xml和core-site.xml文件。

    3、連接“SplitJson”處理器和“PutHDFS”處理器

    同時設(shè)置“SplitJson”處理器中“failure”和“original”數(shù)據(jù)關(guān)系自動終止。

    設(shè)置“PutHDFS”處理器“success”和“failure”數(shù)據(jù)關(guān)系自動終止:

    配置好的連接關(guān)系如下:

    五、??????????????運行測試

    1、在MySQL創(chuàng)建庫“mynifi”,并且創(chuàng)建表“test1”,向表中插入10條數(shù)據(jù)

    mysql> create database mynifi;Query OK, 1 row affected (0.02 sec)mysql> use mynifi;Database changedmysql> create table test1(id int,name varchar(255),age int );Query OK, 0 rows affected (0.07 sec)mysql> insert into test1 values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tt",22)

    2、首先啟動“QueryDatabaseTable”處理器觀察隊列數(shù)據(jù)

    3、單獨啟動“ConvertAvroToJson”處理器觀察隊列數(shù)據(jù)

    4、單獨啟動“SplitJson”處理器觀察隊列數(shù)據(jù)

    5、單獨啟動“PutHDFS”處理器觀察HDFS對應(yīng)目錄數(shù)據(jù)

    查看數(shù)據(jù):

    注意:

    如果在“QueryDatabaseTable”處理器中設(shè)置增屬性“Maximum-value Columns”為id,那么每次查詢都是大于id的增量數(shù)據(jù)。如果想要存入HDFS文件為多行而不是一行,可以將“CovertAvroToJson”處理器屬性“JSON container options”設(shè)置為none,直接解析Avro文件得到一個個json數(shù)據(jù),然后直接連接“PutHDFS”處理器即可。

    最近更新