2023-07-10 14:20:01來源:DeepHub IMBA
在Pandas 2.0發(fā)布以后,我們發(fā)布過一些評測的文章,這次我們看看,除了Pandas以外,常用的兩個都是為了大數(shù)據(jù)處理的并行數(shù)據(jù)框架的對比測試。
本文我們使用兩個類似的腳本來執(zhí)行提取、轉(zhuǎn)換和加載(ETL)過程。
(相關(guān)資料圖)
這兩個腳本主要功能包括:
從兩個parquet 文件中提取數(shù)據(jù),對于小型數(shù)據(jù)集,變量path1將為“yellow_tripdata/ yellow_tripdata_2014-01”,對于中等大小的數(shù)據(jù)集,變量path1將是“yellow_tripdata/yellow_tripdata”。對于大數(shù)據(jù)集,變量path1將是“yellow_tripdata/yellow_tripdata*.parquet”;
進(jìn)行數(shù)據(jù)轉(zhuǎn)換:a)連接兩個DF,b)根據(jù)PULocationID計算行程距離的平均值,c)只選擇某些條件的行,d)將步驟b的值四舍五入為2位小數(shù),e)將列“trip_distance”重命名為“mean_trip_distance”,f)對列“mean_trip_distance”進(jìn)行排序。
將最終的結(jié)果保存到新的文件。
腳本1、Polars數(shù)據(jù)加載讀取
def extraction(): """ Extract two datasets from parquet files """ path1="yellow_tripdata/yellow_tripdata_2014-01.parquet" df_trips= pl_read_parquet(path1,) path2 = "taxi+_zone_lookup.parquet" df_zone = pl_read_parquet(path2,) return df_trips, df_zone def pl_read_parquet(path, ): """ Converting parquet file into Polars dataframe """ df= pl.scan_parquet(path,) return df
轉(zhuǎn)換函數(shù)
def transformation(df_trips, df_zone): """ Proceed to several transformations """ df_trips= mean_test_speed_pl(df_trips, ) df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",) df = df.select(["Borough","Zone","trip_distance",]) df = get_Queens_test_speed_pd(df) df = round_column(df, "trip_distance",2) df = rename_column(df, "trip_distance","mean_trip_distance") df = sort_by_columns_desc(df, "mean_trip_distance") return df def mean_test_speed_pl(df_pl,): """ Getting Mean per PULocationID """ df_pl = df_pl.groupby("PULocationID").agg(pl.col(["trip_distance",]).mean()) return df_pl def get_Queens_test_speed_pd(df_pl): """ Only getting Borough in Queens """ df_pl = df_pl.filter(pl.col("Borough")=="Queens") return df_pl def round_column(df, column,to_round): """ Round numbers on columns """ df = df.with_columns(pl.col(column).round(to_round)) return df def rename_column(df, column_old, column_new): """ Renaming columns """ df = df.rename({column_old: column_new}) return df def sort_by_columns_desc(df, column): """ Sort by column """ df = df.sort(column, descending=True) return df
保存
def loading_into_parquet(df_pl): """ Save dataframe in parquet """ df_pl.collect(streaming=True).write_parquet(f"yellow_tripdata_pl.parquet")
其他代碼
import polars as pl import time def pl_read_parquet(path, ): """ Converting parquet file into Polars dataframe """ df= pl.scan_parquet(path,) return df def mean_test_speed_pl(df_pl,): """ Getting Mean per PULocationID """ df_pl = df_pl.groupby("PULocationID").agg(pl.col(["trip_distance",]).mean()) return df_pl def get_Queens_test_speed_pd(df_pl): """ Only getting Borough in Queens """ df_pl = df_pl.filter(pl.col("Borough")=="Queens") return df_pl def round_column(df, column,to_round): """ Round numbers on columns """ df = df.with_columns(pl.col(column).round(to_round)) return df def rename_column(df, column_old, column_new): """ Renaming columns """ df = df.rename({column_old: column_new}) return df def sort_by_columns_desc(df, column): """ Sort by column """ df = df.sort(column, descending=True) return df def main(): print(f"Starting ETL for Polars") start_time = time.perf_counter() print("Extracting...") df_trips, df_zone =extraction() end_extract=time.perf_counter() time_extract =end_extract- start_time print(f"Extraction Parquet end in {round(time_extract,5)} seconds") print("Transforming...") df = transformation(df_trips, df_zone) end_transform = time.perf_counter() time_transformation =time.perf_counter() - end_extract print(f"Transformation end in {round(time_transformation,5)} seconds") print("Loading...") loading_into_parquet(df,) load_transformation =time.perf_counter() - end_transform print(f"Loading end in {round(load_transformation,5)} seconds") print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}") if __name__ == "__main__": main()
2、Dask函數(shù)功能與上面一樣,所以我們把代碼整合在一起:
import dask.dataframe as dd from dask.distributed import Client import time def extraction(): path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet" df_trips = dd.read_parquet(path1) path2 = "taxi+_zone_lookup.parquet" df_zone = dd.read_parquet(path2) return df_trips, df_zone def transformation(df_trips, df_zone): df_trips = mean_test_speed_dask(df_trips) df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID") df = df[["Borough", "Zone", "trip_distance"]] df = get_Queens_test_speed_dask(df) df = round_column(df, "trip_distance", 2) df = rename_column(df, "trip_distance", "mean_trip_distance") df = sort_by_columns_desc(df, "mean_trip_distance") return df def loading_into_parquet(df_dask): df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet") def mean_test_speed_dask(df_dask): df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"}) return df_dask def get_Queens_test_speed_dask(df_dask): df_dask = df_dask[df_dask["Borough"] == "Queens"] return df_dask def round_column(df, column, to_round): df[column] = df[column].round(to_round) return df def rename_column(df, column_old, column_new): df = df.rename(columns={column_old: column_new}) return df def sort_by_columns_desc(df, column): df = df.sort_values(column, ascending=False) return df def main(): print("Starting ETL for Dask") start_time = time.perf_counter() client = Client() # Start Dask Client df_trips, df_zone = extraction() end_extract = time.perf_counter() time_extract = end_extract - start_time print(f"Extraction Parquet end in {round(time_extract, 5)} seconds") print("Transforming...") df = transformation(df_trips, df_zone) end_transform = time.perf_counter() time_transformation = time.perf_counter() - end_extract print(f"Transformation end in {round(time_transformation, 5)} seconds") print("Loading...") loading_into_parquet(df) load_transformation = time.perf_counter() - end_transform print(f"Loading end in {round(load_transformation, 5)} seconds") print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}") client.close() # Close Dask Client if __name__ == "__main__": main()
測試結(jié)果對比1、小數(shù)據(jù)集我們使用164 Mb的數(shù)據(jù)集,這樣大小的數(shù)據(jù)集對我們來說比較小,在日常中也時非常常見的。
下面是每個庫運(yùn)行五次的結(jié)果:
Polars
Dask
2、中等數(shù)據(jù)集我們使用1.1 Gb的數(shù)據(jù)集,這種類型的數(shù)據(jù)集是GB級別,雖然可以完整的加載到內(nèi)存中,但是數(shù)據(jù)體量要比小數(shù)據(jù)集大很多。
Polars
Dask
3、大數(shù)據(jù)集我們使用一個8gb的數(shù)據(jù)集,這樣大的數(shù)據(jù)集可能一次性加載不到內(nèi)存中,需要框架的處理。
Polars
Dask
總結(jié)從結(jié)果中可以看出,Polars和Dask都可以使用惰性求值。所以讀取和轉(zhuǎn)換非??欤瑘?zhí)行它們的時間幾乎不隨數(shù)據(jù)集大小而變化;
可以看到這兩個庫都非常擅長處理中等規(guī)模的數(shù)據(jù)集。
由于polar和Dask都是使用惰性運(yùn)行的,所以下面展示了完整ETL的結(jié)果(平均運(yùn)行5次)。
Polars在小型數(shù)據(jù)集和中型數(shù)據(jù)集的測試中都取得了勝利。但是,Dask在大型數(shù)據(jù)集上的平均時間性能為26秒。
這可能和Dask的并行計算優(yōu)化有關(guān),因為官方的文檔說“Dask任務(wù)的運(yùn)行速度比Spark ETL查詢快三倍,并且使用更少的CPU資源”。
上面是測試使用的電腦配置,Dask在計算時占用的CPU更多,可以說并行性能更好。
關(guān)鍵詞:
在Pandas2 0發(fā)布以后,我們發(fā)布過一些評測的文章,這次我們看看,除了P
VR游戲「VRosty」的設(shè)計理念既富有創(chuàng)意又拋出問題:為什么玩家不能采用
csgo是一款熱門的第一人稱射擊游戲,游戲中有各式各樣精美的皮膚,玩家
近年來,隨著生活水平的不斷提高,人們對家居裝飾的需求也隨之增加。那
元道通信公告,股東濰坊中科擬減持不超6%公司股份。(以上信息為南都·
港股獲四利好刺激現(xiàn)階段繼續(xù)做波段。新聞資訊提供最新、最及時的新聞服
家人們,終于來了!剛剛,ChatGPT「代碼解釋器」測試版正式向所有Plus
就在今天,天網(wǎng)降臨!機(jī)器人在日內(nèi)瓦剛剛召開了它們歷史上的第一臺新聞
燕之屋再次向港交所提出上市申請。在此之前,燕之屋已經(jīng)多次提出申請擬
02:07滈汗萬方,求索文明的密碼;厚土之下,探尋文化的基因。陜西廣電
《博德之門3》是一款大型RPG游戲,隨著更多深度玩法和新內(nèi)容演示的公布
Redis發(fā)布訂閱是一種消息傳遞機(jī)制,它允許客戶端訂閱頻道并接收來自該
我們?nèi)粘I钪惺褂煤芏鄳?yīng)用程序,有微信、抖音、王者這種涉及多人聯(lián)網(wǎng)
Python支持函數(shù)式編程,函數(shù)式編程是一種編程范式,它將計算機(jī)程序視為
前幾天在我創(chuàng)建的技術(shù)交流群,幾位同學(xué)聊起了兼容性測試相關(guān)的話題。有