人人妻人人澡人人爽人人精品av_精品乱码一区内射人妻无码_老司机午夜福利视频_精品成品国色天香摄像头_99精品福利国产在线导航_野花社区在线观看视频_大地资源在线影视播放_东北高大肥胖丰满熟女_金门瓶马车内剧烈运动

首頁>國內(nèi) > 正文

并行計算框架Polars、Dask的數(shù)據(jù)處理性能對比

2023-07-10 14:20:01來源:DeepHub IMBA

在Pandas 2.0發(fā)布以后,我們發(fā)布過一些評測的文章,這次我們看看,除了Pandas以外,常用的兩個都是為了大數(shù)據(jù)處理的并行數(shù)據(jù)框架的對比測試。

本文我們使用兩個類似的腳本來執(zhí)行提取、轉(zhuǎn)換和加載(ETL)過程。


(相關(guān)資料圖)

測試內(nèi)容

這兩個腳本主要功能包括:

從兩個parquet 文件中提取數(shù)據(jù),對于小型數(shù)據(jù)集,變量path1將為“yellow_tripdata/ yellow_tripdata_2014-01”,對于中等大小的數(shù)據(jù)集,變量path1將是“yellow_tripdata/yellow_tripdata”。對于大數(shù)據(jù)集,變量path1將是“yellow_tripdata/yellow_tripdata*.parquet”;

進(jìn)行數(shù)據(jù)轉(zhuǎn)換:a)連接兩個DF,b)根據(jù)PULocationID計算行程距離的平均值,c)只選擇某些條件的行,d)將步驟b的值四舍五入為2位小數(shù),e)將列“trip_distance”重命名為“mean_trip_distance”,f)對列“mean_trip_distance”進(jìn)行排序。

將最終的結(jié)果保存到新的文件。

腳本1、Polars

數(shù)據(jù)加載讀取

def extraction():    """    Extract two datasets from parquet files    """    path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"    df_trips= pl_read_parquet(path1,)    path2 = "taxi+_zone_lookup.parquet"    df_zone = pl_read_parquet(path2,)     return df_trips, df_zone  def pl_read_parquet(path, ):    """    Converting parquet file into Polars dataframe    """    df= pl.scan_parquet(path,)    return df

轉(zhuǎn)換函數(shù)

def transformation(df_trips, df_zone):    """    Proceed to several transformations    """    df_trips= mean_test_speed_pl(df_trips, )         df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)    df = df.select(["Borough","Zone","trip_distance",])       df = get_Queens_test_speed_pd(df)    df = round_column(df, "trip_distance",2)    df = rename_column(df, "trip_distance","mean_trip_distance")     df = sort_by_columns_desc(df, "mean_trip_distance")    return df   def mean_test_speed_pl(df_pl,):    """    Getting Mean per PULocationID    """    df_pl = df_pl.groupby("PULocationID").agg(pl.col(["trip_distance",]).mean())    return df_pl  def get_Queens_test_speed_pd(df_pl):    """    Only getting Borough in Queens    """     df_pl = df_pl.filter(pl.col("Borough")=="Queens")     return df_pl  def round_column(df, column,to_round):    """    Round numbers on columns    """    df = df.with_columns(pl.col(column).round(to_round))    return df  def rename_column(df, column_old, column_new):    """    Renaming columns    """    df = df.rename({column_old: column_new})    return df  def sort_by_columns_desc(df, column):    """    Sort by column    """    df = df.sort(column, descending=True)    return df

保存

def loading_into_parquet(df_pl):    """    Save dataframe in parquet    """    df_pl.collect(streaming=True).write_parquet(f"yellow_tripdata_pl.parquet")

其他代碼

import polars as pl import time  def pl_read_parquet(path, ):    """    Converting parquet file into Polars dataframe    """    df= pl.scan_parquet(path,)    return df  def mean_test_speed_pl(df_pl,):    """    Getting Mean per PULocationID    """    df_pl = df_pl.groupby("PULocationID").agg(pl.col(["trip_distance",]).mean())    return df_pl  def get_Queens_test_speed_pd(df_pl):    """    Only getting Borough in Queens    """     df_pl = df_pl.filter(pl.col("Borough")=="Queens")     return df_pl  def round_column(df, column,to_round):    """    Round numbers on columns    """    df = df.with_columns(pl.col(column).round(to_round))    return df  def rename_column(df, column_old, column_new):    """    Renaming columns    """    df = df.rename({column_old: column_new})    return df   def sort_by_columns_desc(df, column):    """    Sort by column    """    df = df.sort(column, descending=True)    return df   def main():         print(f"Starting ETL for Polars")    start_time = time.perf_counter()     print("Extracting...")    df_trips, df_zone =extraction()            end_extract=time.perf_counter()     time_extract =end_extract- start_time     print(f"Extraction Parquet end in {round(time_extract,5)} seconds")    print("Transforming...")    df = transformation(df_trips, df_zone)    end_transform = time.perf_counter()     time_transformation =time.perf_counter() - end_extract    print(f"Transformation end in {round(time_transformation,5)} seconds")    print("Loading...")    loading_into_parquet(df,)    load_transformation =time.perf_counter() - end_transform    print(f"Loading end in {round(load_transformation,5)} seconds")    print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")   if __name__ == "__main__":         main()
2、Dask

函數(shù)功能與上面一樣,所以我們把代碼整合在一起:

import dask.dataframe as dd from dask.distributed import Client import time  def extraction():    path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"    df_trips = dd.read_parquet(path1)    path2 = "taxi+_zone_lookup.parquet"    df_zone = dd.read_parquet(path2)     return df_trips, df_zone  def transformation(df_trips, df_zone):    df_trips = mean_test_speed_dask(df_trips)    df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")    df = df[["Borough", "Zone", "trip_distance"]]     df = get_Queens_test_speed_dask(df)    df = round_column(df, "trip_distance", 2)    df = rename_column(df, "trip_distance", "mean_trip_distance")     df = sort_by_columns_desc(df, "mean_trip_distance")    return df  def loading_into_parquet(df_dask):    df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")  def mean_test_speed_dask(df_dask):    df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})    return df_dask  def get_Queens_test_speed_dask(df_dask):    df_dask = df_dask[df_dask["Borough"] == "Queens"]    return df_dask  def round_column(df, column, to_round):    df[column] = df[column].round(to_round)    return df  def rename_column(df, column_old, column_new):    df = df.rename(columns={column_old: column_new})    return df  def sort_by_columns_desc(df, column):    df = df.sort_values(column, ascending=False)    return df    def main():    print("Starting ETL for Dask")    start_time = time.perf_counter()     client = Client() # Start Dask Client     df_trips, df_zone = extraction()     end_extract = time.perf_counter()    time_extract = end_extract - start_time     print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")    print("Transforming...")    df = transformation(df_trips, df_zone)    end_transform = time.perf_counter()    time_transformation = time.perf_counter() - end_extract    print(f"Transformation end in {round(time_transformation, 5)} seconds")    print("Loading...")    loading_into_parquet(df)    load_transformation = time.perf_counter() - end_transform    print(f"Loading end in {round(load_transformation, 5)} seconds")    print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")     client.close() # Close Dask Client  if __name__ == "__main__":    main()
測試結(jié)果對比1、小數(shù)據(jù)集

我們使用164 Mb的數(shù)據(jù)集,這樣大小的數(shù)據(jù)集對我們來說比較小,在日常中也時非常常見的。

下面是每個庫運(yùn)行五次的結(jié)果:

Polars

Dask

2、中等數(shù)據(jù)集

我們使用1.1 Gb的數(shù)據(jù)集,這種類型的數(shù)據(jù)集是GB級別,雖然可以完整的加載到內(nèi)存中,但是數(shù)據(jù)體量要比小數(shù)據(jù)集大很多。

Polars

Dask

3、大數(shù)據(jù)集

我們使用一個8gb的數(shù)據(jù)集,這樣大的數(shù)據(jù)集可能一次性加載不到內(nèi)存中,需要框架的處理。

Polars

Dask

總結(jié)

從結(jié)果中可以看出,Polars和Dask都可以使用惰性求值。所以讀取和轉(zhuǎn)換非??欤瑘?zhí)行它們的時間幾乎不隨數(shù)據(jù)集大小而變化;

可以看到這兩個庫都非常擅長處理中等規(guī)模的數(shù)據(jù)集。

由于polar和Dask都是使用惰性運(yùn)行的,所以下面展示了完整ETL的結(jié)果(平均運(yùn)行5次)。

Polars在小型數(shù)據(jù)集和中型數(shù)據(jù)集的測試中都取得了勝利。但是,Dask在大型數(shù)據(jù)集上的平均時間性能為26秒。

這可能和Dask的并行計算優(yōu)化有關(guān),因為官方的文檔說“Dask任務(wù)的運(yùn)行速度比Spark ETL查詢快三倍,并且使用更少的CPU資源”。

上面是測試使用的電腦配置,Dask在計算時占用的CPU更多,可以說并行性能更好。

關(guān)鍵詞:

相關(guān)新聞

Copyright 2015-2020   三好網(wǎng)  版權(quán)所有 聯(lián)系郵箱:435 22 [email protected]  備案號: 京ICP備2022022245號-21