Linux command line tool + pipe 學習筆記之一:讓R 加入pipe的一環

最近我在幫General Mobile Corporation開發RTB系統。為了要處理有點大量,又沒有太大量的資料,與其直接用貴鬆鬆的AWS EMR(Elastic Map Reduce)去跑Apache Spark, 我選擇用基本的pipe與一些精巧的小程式,搭配nodejs與R來達成整理資料的目的。

動機則來自於很久以前讀過的文章:Command-line tools can be 235x faster than your Hadoop cluster以及過去自己維護過HDFS Cluster的經驗。至少在公司願意養一個full time engineer來維護Cluster,或是資料大到十億筆等級以上時,我覺得才有開始導入Apache Spark的必要性。

基本上,運用linux command line tool + pipe有幾個好處:

  • 高效
  • 省記憶體
  • 自動平行化

許多清理資料的功能,可以透過linux command line tool,搭配pipe的技巧做串接。這些小程式的效能,都是好的驚人的。與我自己簡單寫的c++程式做比較,這些小程式的效能大概還要快10倍。理由是因為,這些程式的優化都做得很深,所以效能可以海放我這種不是科班出身的工程師。

另一個特色是,pipe天生就會讓程式平行運作。在現在CPU都是多核的年代,一旦用pipe開發後,你就會看到一個CPU core在解壓縮,一個CPU core過濾資料,一個CPU core轉換資料。只要每個動作的效能差不多,就不用額外費心去做平行化。

前提條件:資料能乾淨的用一行來做單位

過去在前公司,我沒辦法盡興地使用Linux pipe tools的主因是,公司log的資料中,會有大量的斷行符號,導致處理時很麻煩。而在Gmobi Inc.時, 感謝同仁的配合,原始的log記錄是用ndjson的格式處理,所以用linux pipe style來做資料的前處理很方便。

以下我就用簡單的經驗,來介紹與記錄這陣子摸索的小工具們。

解壓縮起手式:zcat

為了節省雲端的硬碟用量,照慣例工程師都會用gz格式做壓縮,Gmobi也不例外。所以解壓縮工具:zcat就變成了起手式。

舉例來說,如果有一個檔案:bids.20160401.json.gz要處理,起手式就是:

zcat < bids.20160401.json.gz  

這裡的<符號,會讓作業系統把bids.20160401.json.gz的檔案內容,從stdin的管線入口,導入至zcat的程序(Process)。zcat接著會將資料解壓縮後,寫到stdout。如果後面沒有串接其他的程序,stdout的結果就會直接呈現在螢幕上,所以我就會看到整個解壓縮後的bids.20160401.json.gz的檔案內容。

如果我們只執行上述的指令,bids.20160401.json.gz不會有任何更動,也不會產生任何新的檔案,就只是把bids.20160401.json.gz的內容印到螢幕上。

開發的時候,常常要測試自己的程式。此時head就會很方便。舉例來說:

zcat < bids.20160401.json.gz | head -n 100  

就會高效率的只輸出檔案內容的前100筆資料,讓我們可以在很短的時間內做測試。

這裡|的符號,告訴作業系統將zcat的stdout串接到head的stdin,也就是把zcat處理後的輸出,當成head的輸入。而head程序只處理前100行(用-n參數控制),將內容輸入到stdout,之後就將程式關閉(連帶的zcat也會跟著關閉)。

利用R 來處理stdin的資料

接著,我們可以用R 寫一個命令列應用,如:

#! /usr/bin/env Rscript --vanilla
f <- file("stdin")  
open(f)  
while(length(line <- readLines(f, n = 1)) > 0) {  
  ## do something with line
  write(nchar(line), stdout())
}

將檔案存到nchar.R後,我們就可以輸入:

zcat < bids.20160401.json.gz | head -n 100 | Rscript nchar.R  

如果執行這段程式碼的話,螢幕上就會印出bids.20160401.json.gz中每一行的字元數量。

這裡的原理,是因為R透過f這個變數,打開stdin,並且用readLines一行一行的將stdin的結果輸入至變數line

接著,我們再將nchar(line)的結果寫入到stdout()

透過這樣的方式,只要修改註解的部分,甚至是輸出的部分,就可以將R一併串入pipe style之中。

以下我想分享一些,我會想用pipe style處理資料的情境:

情境:處理超過記憶體的檔案

假設我們有一個很大的csv檔案,超過了我們電腦的記憶體。 這個檔案正常來說應該要有10個欄位。 但是其中有某幾個line中,可能多出一些,導致讀出來的欄位錯亂。 為了排除這樣的資料錯誤,我們想先捨棄掉,的數量不同的資料後,再用read.csv處理。

readLines切割後再輸出

最直接的方式,就是用readLines將資料讀出來後,丟掉那些欄位數量不是10的資料:

x <- readLines("big.csv")  
x2 <- strsplit(x, ",")  
write(x[sapply(x2, length) == 10], file = "corrected.csv") # 這些就是符合條件的資料  

其實這應該是在R 裡面最快的方法,只是如果資料超過記憶體,以上的方法就會... 讓你的滑鼠不能動

ps. R 有另一個count.field的指令可以快速計算column的個數

利用chunk一段一段處理

readLines其實可以分批載入檔案,所以我們可以一次處理個10000行,然後把結果暫時存到另一個檔案中

f <- file("big.csv")  
f2 <- file("corrected.csv", "w")  
open(f)  
open(f2)  
while(length(x <- readLines(f, n = 10000)) > 0) {  
  x2 <- strsplit(x, ",")
  write(x[sapply(x2, length) == 10], f2) # 這些就是符合條件的資料
}
close(f2)  

利用stdin與stdout

如果掌握stdin與stdout,我們可以就寫出一個檔案:filter.R

#! /usr/bin/env Rscript --vanilla
f <- file("stdin")  
open(f)  
while(length(x <- readLines(f, n = 10000)) > 0) {  
  ## do something with line
  x2 <- strsplit(x, ",")
  write(x[sapply(x2, length) == 10], stdout())
}

然後下指令:

Rscript filter.R < big.csv > corrected.csv  

後面兩種方式都能夠在有限的記憶體下去處理超過記憶體大小的資料。 訣竅在於,每次只讀出一小部分的資料做處理,處理後就扔掉。 但是如果掌握了許多高效能的linux command line tool,就可以更有效率地完成這樣的工作。

利用 awk

awk是一個功能滿複雜的linux command line tool,但是它可以用十倍,甚至百倍以上的速度,完成我們上面的功能。

根據google後,我們可以查到以下的語法:

awk -F , 'NF == 10' < big.csv > corrected.csv  

在我的Mac Book Air電腦上,處理一個1e6筆資料做實驗

  • 第一種方法readLines + strsplit花了18秒
  • 第二種方法readLines + count.fields花了10秒
  • 第三種方法readLines + strsplit with chunk style大概花18秒左右,沒有變慢(但是不用一次處理全部的資料,所以記憶體需求低)
  • 第四種方法filter.R + stdin, stdout 大概花了18秒左右,
  • 第五種方法awk,大概1.9秒,快了十倍

pipe的用處

實務上,R的好處就是彈性,可以在R裡面完成許多複雜的功能。例如,先把資料解壓縮(用gzfile),只拿包含特定字串的資料... 等等

linux command line tool呢,則是一次處理一個功能,但是搭配pipe,也可以完成複雜的功能。舉例來說,如果檔案是gzip壓縮過的話呢?只要用pipe |串接zcatawk就可以了。

zcat < big.csv.gz | awk -F , 'NF == 10' > corrected.csv  

由於這些工具的指令也滿複雜的,我自己是無法一次全學,所以常常會混用linux commandline tool與R。利用R來完成那些,複雜到我不知道怎麼用linux commandline tool來完成的功能。

以上就是一個小小的心得筆記。