2023-07-11 11:17:38來源:Java極客技術(shù)
在計(jì)算機(jī)中,IO 傳輸數(shù)據(jù)有三種工作方式,分別是:BIO、NIO、AIO。
(相關(guān)資料圖)
在講解BIO、NIO、AIO之前,我們先來回顧一下這幾個(gè)概念:同步與異步,阻塞與非阻塞。
同步與異步的區(qū)別
同步就是發(fā)起一個(gè)請(qǐng)求后,接受者未處理完請(qǐng)求之前,不返回結(jié)果。異步就是發(fā)起一個(gè)請(qǐng)求后,立刻得到接受者的回應(yīng)表示已接收到請(qǐng)求,但是接受者并沒有處理完,接受者通常依靠事件回調(diào)等機(jī)制來通知請(qǐng)求者其處理結(jié)果。阻塞和非阻塞的區(qū)別
阻塞就是請(qǐng)求者發(fā)起一個(gè)請(qǐng)求,一直等待其請(qǐng)求結(jié)果返回,也就是當(dāng)前線程會(huì)被掛起,無法從事其他任務(wù),只有當(dāng)條件就緒才能繼續(xù)。非阻塞就是請(qǐng)求者發(fā)起一個(gè)請(qǐng)求,不用一直等著結(jié)果返回,可以先去干其他事情,當(dāng)條件就緒的時(shí)候,就自動(dòng)回來。而我們要講的BIO、NIO、AIO就是同步與異步、阻塞與非阻塞的組合。
BIO:同步阻塞 IO;NIO:同步非阻塞 IO;AIO:異步非阻塞 IO;不同的工作方式,帶來的傳輸效率是不一樣的,下面我們以網(wǎng)絡(luò) IO 為例,一起看看不同的工作方式下,彼此之間有何不同。
二、BIOBIO 俗稱同步阻塞 IO,是一種非常傳統(tǒng)的 IO 模型,也是最常用的網(wǎng)絡(luò)數(shù)據(jù)傳輸處理方式,優(yōu)點(diǎn)就是編程簡單,但是缺點(diǎn)也很明顯,I/O 傳輸性能一般比較差,CPU 大部分處于空閑狀態(tài)。
采用 BIO 通信模型的服務(wù)端,通常由一個(gè)獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽所有客戶端的連接,當(dāng)服務(wù)端接受到多個(gè)客戶端的請(qǐng)求時(shí),所有的客戶端只能排隊(duì)等待服務(wù)端一個(gè)一個(gè)的處理。
BIO 通信模型圖如下!
圖片
一般在服務(wù)端通過while(true)循環(huán)中會(huì)調(diào)用accept()方法監(jiān)聽客戶端的連接,一旦接收到一個(gè)連接請(qǐng)求,就可以建立通信套接字進(jìn)行讀寫操作,此時(shí)不能再接收其他客戶端連接請(qǐng)求,只能等待同當(dāng)前連接的客戶端的操作執(zhí)行完成。
服務(wù)端操作,樣例程序如下:
public class BioServerTest { public static void main(String[] args) throws IOException { //初始化服務(wù)端socket并且綁定 8080 端口 ServerSocket serverSocket = new ServerSocket(8080); //循環(huán)監(jiān)聽客戶端請(qǐng)求 while (true){ try { //監(jiān)聽客戶端請(qǐng)求 Socket socket = serverSocket.accept(); //將字節(jié)流轉(zhuǎn)化成字符流,讀取客戶端輸入的內(nèi)容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //讀取一行數(shù)據(jù) String str = bufferedReader.readLine(); //打印客戶端發(fā)送的信息 System.out.println("服務(wù)端收到客戶端發(fā)送的信息:" + str); //向客戶端返回信息,將字符轉(zhuǎn)化成字節(jié)流,并輸出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); printWriter.println("hello,我是服務(wù)端,已收到消息"); // 關(guān)閉流 bufferedReader.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); } } }}
客戶端操作,樣例程序如下:
public class BioClientTest { public static void main(String[] args) { //創(chuàng)建10個(gè)線程,模擬10個(gè)客戶端,同時(shí)向服務(wù)端發(fā)送請(qǐng)求 for (int i = 0; i < 10; i++) { final int j = i;//定義變量 new Thread(new Runnable() { @Override public void run() { try { //通過IP和端口與服務(wù)端建立連接 Socket socket =new Socket("127.0.0.1",8080); //將字符流轉(zhuǎn)化成字節(jié)流,并輸出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); String str="Hello,我是" + j + "個(gè),客戶端!"; printWriter.println(str); //從輸入流中讀取服務(wù)端返回的信息,將字節(jié)流轉(zhuǎn)化成字符流 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //讀取內(nèi)容 String result = bufferedReader.readLine(); //打印服務(wù)端返回的信息 System.out.println("客戶端發(fā)送請(qǐng)求內(nèi)容:" + str + " -> 收到服務(wù)端返回的內(nèi)容:" + result); // 關(guān)閉流 bufferedReader.close(); printWriter.close(); // 關(guān)閉socket socket.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } }}
最后,依次啟動(dòng)服務(wù)端、客戶端,看看控制臺(tái)輸出情況如何。
服務(wù)端控制臺(tái)結(jié)果如下:
服務(wù)端收到客戶端發(fā)送的信息:Hello,我是8個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是9個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是7個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是5個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是4個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是3個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是6個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是2個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是1個(gè),客戶端!服務(wù)端收到客戶端發(fā)送的信息:Hello,我是0個(gè),客戶端!
客戶端控制臺(tái)結(jié)果如下:
客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是8個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是9個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是7個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是5個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是4個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是3個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是6個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是2個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是1個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息客戶端發(fā)送請(qǐng)求內(nèi)容:Hello,我是0個(gè),客戶端! -> 收到服務(wù)端返回的內(nèi)容:hello,我是服務(wù)端,已收到消息
隨著客戶端的請(qǐng)求次數(shù)越來越多,可能需要排隊(duì)的時(shí)間會(huì)越來越長,因此是否可以在服務(wù)端,采用多線程編程進(jìn)行處理呢?
答案是,可以的!
下面我們對(duì)服務(wù)端的代碼進(jìn)行改造,服務(wù)端多線程操作,樣例程序如下:
public class BioServerTest { public static void main(String[] args) throws IOException { //初始化服務(wù)端socket并且綁定 8080 端口 ServerSocket serverSocket = new ServerSocket(8080); //循環(huán)監(jiān)聽客戶端請(qǐng)求 while (true){ //監(jiān)聽客戶端請(qǐng)求 Socket socket = serverSocket.accept(); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().toString(); //將字節(jié)流轉(zhuǎn)化成字符流,讀取客戶端輸入的內(nèi)容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //讀取一行數(shù)據(jù) String str = bufferedReader.readLine(); //打印客戶端發(fā)送的信息 System.out.println("線程名稱" + threadName + ",服務(wù)端收到客戶端發(fā)送的信息:" + str); //向客戶端返回信息,將字符轉(zhuǎn)化成字節(jié)流,并輸出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); printWriter.println("hello,我是服務(wù)端,已收到消息"); // 關(guān)閉流 bufferedReader.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } }}
依次啟動(dòng)服務(wù)端、客戶端,服務(wù)端控制臺(tái)輸出結(jié)果如下:
線程名稱Thread[Thread-8,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是4個(gè),客戶端!線程名稱Thread[Thread-4,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是8個(gè),客戶端!線程名稱Thread[Thread-0,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是1個(gè),客戶端!線程名稱Thread[Thread-7,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是5個(gè),客戶端!線程名稱Thread[Thread-5,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是2個(gè),客戶端!線程名稱Thread[Thread-9,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是3個(gè),客戶端!線程名稱Thread[Thread-1,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是0個(gè),客戶端!線程名稱Thread[Thread-3,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是7個(gè),客戶端!線程名稱Thread[Thread-2,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是9個(gè),客戶端!線程名稱Thread[Thread-6,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是6個(gè),客戶端!
當(dāng)服務(wù)端接收到客戶端的請(qǐng)求時(shí),會(huì)給每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理,處理完成之后,通過輸出流返回應(yīng)答給客戶端,最后線程會(huì)銷毀。
但是這樣的編程模型也有很大的弊端,如果出現(xiàn) 100、1000、甚至 10000 個(gè)客戶端同時(shí)請(qǐng)求服務(wù)端,采用這種編程模型,服務(wù)端也會(huì)創(chuàng)建與之相同的線程數(shù)量,線程數(shù)急劇膨脹可能會(huì)導(dǎo)致線程堆棧溢出、創(chuàng)建新線程失敗等問題,最終可能導(dǎo)致服務(wù)端宕機(jī)或者僵死,不能對(duì)外提供服務(wù)。
三、偽異步 BIO為了解決上面提到的同步阻塞 I/O 面臨的一個(gè)鏈路需要一個(gè)線程處理的問題,后來有人對(duì)它的編程模型進(jìn)行了優(yōu)化。
在服務(wù)端通過使用 Java 中ThreadPoolExecutor線程池機(jī)制來處理多個(gè)客戶端的請(qǐng)求接入,防止由于海量并發(fā)接入導(dǎo)致資源耗盡,讓線程的創(chuàng)建和回收成本相對(duì)較低,保證了系統(tǒng)有限的資源得以控制,實(shí)現(xiàn)了 N (客戶端請(qǐng)求數(shù)量)大于 M (服務(wù)端處理客戶端請(qǐng)求的線程數(shù)量)的偽異步 I/O 模型。
偽異步 IO 模型圖,如下圖:
圖片
采用線程池和任務(wù)隊(duì)列可以實(shí)現(xiàn)一種叫做偽異步的 I/O 通信框架,當(dāng)有新的客戶端接入時(shí),將客戶端的 Socket 封裝成一個(gè) Task 投遞到線程池中進(jìn)行處理。
服務(wù)端采用線程池處理客戶端請(qǐng)求,樣例程序如下:
public class BioServerTest { public static void main(String[] args) throws IOException { //在線程池中創(chuàng)建5個(gè)固定大小線程,來處理客戶端的請(qǐng)求 ExecutorService executorService = Executors.newFixedThreadPool(5); //初始化服務(wù)端socket并且綁定 8080 端口 ServerSocket serverSocket = new ServerSocket(8080); //循環(huán)監(jiān)聽客戶端請(qǐng)求 while (true){ //監(jiān)聽客戶端請(qǐng)求 Socket socket = serverSocket.accept(); //使用線程池執(zhí)行任務(wù) executorService.execute(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().toString(); //將字節(jié)流轉(zhuǎn)化成字符流,讀取客戶端輸入的內(nèi)容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //讀取一行數(shù)據(jù) String str = bufferedReader.readLine(); //打印客戶端發(fā)送的信息 System.out.println("線程名稱" + threadName + ",服務(wù)端收到客戶端發(fā)送的信息:" + str); //向客戶端返回信息,將字符轉(zhuǎn)化成字節(jié)流,并輸出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); printWriter.println("hello,我是服務(wù)端,已收到消息"); // 關(guān)閉流 bufferedReader.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); } } }); } }}
依次啟動(dòng)服務(wù)端、客戶端,服務(wù)端控制臺(tái)輸出結(jié)果如下:
線程名稱Thread[pool-1-thread-4,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是6個(gè),客戶端!線程名稱Thread[pool-1-thread-2,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是8個(gè),客戶端!線程名稱Thread[pool-1-thread-3,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是9個(gè),客戶端!線程名稱Thread[pool-1-thread-5,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是5個(gè),客戶端!線程名稱Thread[pool-1-thread-1,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是7個(gè),客戶端!線程名稱Thread[pool-1-thread-5,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是2個(gè),客戶端!線程名稱Thread[pool-1-thread-5,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是0個(gè),客戶端!線程名稱Thread[pool-1-thread-1,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是1個(gè),客戶端!線程名稱Thread[pool-1-thread-5,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是3個(gè),客戶端!線程名稱Thread[pool-1-thread-1,5,main],服務(wù)端收到客戶端發(fā)送的信息:Hello,我是4個(gè),客戶端!
本例中測(cè)試的客戶端數(shù)量是 10,服務(wù)端使用 java 線程池來處理任務(wù),線程數(shù)量為 5 個(gè),服務(wù)端不用為每個(gè)客戶端都創(chuàng)建一個(gè)線程,由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù),因此它的資源占用是可控的,無論多少個(gè)客戶端并發(fā)訪問,都不會(huì)導(dǎo)致資源的耗盡和宕機(jī)。
在活動(dòng)連接數(shù)不是特別高的情況下,這種模型還是不錯(cuò)的,可以讓每一個(gè)連接專注于自己的 I/O 并且編程模型簡單,也不用過多考慮系統(tǒng)的過載、限流等問題。
但是,它的底層仍然是同步阻塞的 BIO 模型,當(dāng)面對(duì)十萬甚至百萬級(jí)請(qǐng)求接入的時(shí)候,傳統(tǒng)的 BIO 模型無能為力,因此我們需要一種更高效的 I/O 處理模型來應(yīng)對(duì)更高的并發(fā)量。
四、NIONIO,英文全稱:Non-blocking-IO,一種同步非阻塞的 I/O 模型。
在 Java 1.4 中引入,對(duì)應(yīng)的代碼在java.nio包下。
與傳統(tǒng)的 IO 不同,NIO 新增了Channel、Selector、Buffer等抽象概念,支持面向緩沖、基于通道的 I/O 數(shù)據(jù)傳輸方法。
NIO 模型圖,如下圖:
圖片
與此同時(shí),NIO 還提供了與傳統(tǒng) BIO 模型中的Socket和ServerSocket相對(duì)應(yīng)的SocketChannel和ServerSocketChannel兩種不同的套接字通道實(shí)現(xiàn)。
NIO 這兩種通道都支持阻塞和非阻塞兩種模式。阻塞模式使用就像傳統(tǒng)中的 BIO 一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。
對(duì)于低負(fù)載、低并發(fā)的應(yīng)用程序,可以使用同步阻塞 I/O 來提升開發(fā)效率和更好的維護(hù)性;對(duì)于高負(fù)載、高并發(fā)的(網(wǎng)絡(luò))應(yīng)用,使用 NIO 的非阻塞模式來開發(fā)可以顯著的提升數(shù)據(jù)傳輸效率。
在介紹樣例之前,我們先看一下 NIO 涉及到的核心關(guān)聯(lián)類圖,如下:
圖片
上圖中有三個(gè)關(guān)鍵類:Channel 、Selector 和 Buffer,它們是 NIO 中的核心概念。
Channel:可以理解為通道;Selector:可以理解為選擇器;Buffer:可以理解為數(shù)據(jù)緩沖區(qū);從名詞上看感覺很抽象,我們還是用之前介紹的城市交通工具來繼續(xù)形容 NIO 的工作方式,這里的Channel要比Socket更加具體,它可以比作為某種具體的交通工具,如汽車或是高鐵、飛機(jī)等,而Selector可以比作為一個(gè)車站的車輛運(yùn)行調(diào)度系統(tǒng),它將負(fù)責(zé)監(jiān)控每輛車的當(dāng)前運(yùn)行狀態(tài),是已經(jīng)出站還是在路上等等,也就是說它可以輪詢每個(gè)Channel的狀態(tài)。
還有一個(gè)Buffer類,你可以將它看作為 IO 中Stream,但是它比 IO 中的Stream更加具體化,我們可以將它比作為車上的座位,Channel如果是汽車的話,那么Buffer就是汽車上的座位,Channel如果是高鐵上,那么Buffer就是高鐵上的座位,它始終是一個(gè)具體的概念,這一點(diǎn)與Stream不同。
Socket 中的 Stream只能代表是一個(gè)座位,至于是什么座位由你自己去想象,也就是說你在上車之前并不知道這個(gè)車上是否還有座位,也不知道上的是什么車,因?yàn)槟悴⒉荒苓x擇,這些信息都已經(jīng)被封裝在了運(yùn)輸工具(Socket)里面了。
NIO 引入了Channel、Buffer 和 Selector就是想把 IO 傳輸過程中涉及到的信息具體化,讓程序員有機(jī)會(huì)去控制它們。
當(dāng)我們進(jìn)行傳統(tǒng)的網(wǎng)絡(luò) IO 操作時(shí),比如調(diào)用write()往 Socket 中的SendQ隊(duì)列寫數(shù)據(jù)時(shí),當(dāng)一次寫的數(shù)據(jù)超過SendQ長度時(shí),操作系統(tǒng)會(huì)按照SendQ的長度進(jìn)行分割的,這個(gè)過程中需要將用戶空間數(shù)據(jù)和內(nèi)核地址空間進(jìn)行切換,而這個(gè)切換不是程序員可以控制的,由底層操作系統(tǒng)來幫我們處理。
而在Buffer中,我們可以控制Buffer的capacity(容量),并且是否擴(kuò)容以及如何擴(kuò)容都可以控制。
理解了這些概念后我們看一下,實(shí)際上它們是如何工作的呢?
我們一起來看看代碼實(shí)例!
服務(wù)端操作,樣例程序如下:
/** * NIO 服務(wù)端 */public class NioServerTest { public static void main(String[] args) throws IOException { // 打開服務(wù)器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 服務(wù)器配置為非阻塞 ssc.configureBlocking(false); // 進(jìn)行服務(wù)的綁定,監(jiān)聽8080端口 ssc.socket().bind(new InetSocketAddress(8080)); // 構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去 Selector selector = Selector.open(); // 將serverSocketChannel注冊(cè)到selector,并對(duì)accept事件感興趣(serverSocketChannel只能支持accept操作) ssc.register(selector, SelectionKey.OP_ACCEPT); while (true){ // 查詢指定事件已經(jīng)就緒的通道數(shù)量,select方法有阻塞效果,直到有事件通知才會(huì)有返回,如果為0就跳過 int readyChannels = selector.select(); if(readyChannels == 0) { continue; }; //通過選擇器取得所有key集合 Set selectedKeys = selector.selectedKeys(); Iterator iterator = selectedKeys.iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); //判斷狀態(tài)是否有效 if (!key.isValid()) { continue; } if (key.isAcceptable()) { // 處理通道中的連接事件 ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel sc = server.accept(); sc.configureBlocking(false); System.out.println("接收到新的客戶端連接,地址:" + sc.getRemoteAddress()); // 將通道注冊(cè)到選擇器并處理通道中可讀事件 sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 處理通道中的可讀事件 SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (channel.isOpen() && channel.read(byteBuffer) != -1) { // 長連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個(gè)簡單的判斷: 超過0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了) if (byteBuffer.position() > 0) { break; }; } byteBuffer.flip(); //獲取緩沖中的數(shù)據(jù) String result = new String(byteBuffer.array(), 0, byteBuffer.limit()); System.out.println("收到客戶端發(fā)送的信息,內(nèi)容:" + result); // 將通道注冊(cè)到選擇器并處理通道中可寫事件 channel.register(selector, SelectionKey.OP_WRITE); } else if (key.isWritable()) { // 處理通道中的可寫事件 SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put("server send".getBytes()); byteBuffer.flip(); channel.write(byteBuffer); // 將通道注冊(cè)到選擇器并處理通道中可讀事件 channel.register(selector, SelectionKey.OP_READ); //寫完之后關(guān)閉通道 channel.close(); } //當(dāng)前事件已經(jīng)處理完畢,可以丟棄 iterator.remove(); } } }}
客戶端操作,樣例程序如下:
/** * NIO 客戶端 */public class NioClientTest { public static void main(String[] args) throws IOException { // 打開socket通道 SocketChannel sc = SocketChannel.open(); //設(shè)置為非阻塞 sc.configureBlocking(false); //連接服務(wù)器地址和端口 sc.connect(new InetSocketAddress("127.0.0.1", 8080)); while (!sc.finishConnect()) { // 沒連接上,則一直等待 System.out.println("客戶端正在連接中,請(qǐng)耐心等待"); } // 發(fā)送內(nèi)容 ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put("Hello,我是客戶端".getBytes()); writeBuffer.flip(); sc.write(writeBuffer); // 讀取響應(yīng) ByteBuffer readBuffer = ByteBuffer.allocate(1024); while (sc.isOpen() && sc.read(readBuffer) != -1) { // 長連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個(gè)簡單的判斷: 超過0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了) if (readBuffer.position() > 0) { break; }; } readBuffer.flip(); String result = new String(readBuffer.array(), 0, readBuffer.limit()); System.out.println("客戶端收到服務(wù)端:" + sc.socket().getRemoteSocketAddress() + ",返回的信息:" + result); // 關(guān)閉通道 sc.close(); }}
最后,依次啟動(dòng)服務(wù)端、客戶端,看看控制臺(tái)輸出情況如何。
服務(wù)端控制臺(tái)結(jié)果如下:
接收到新的客戶端連接,地址:/127.0.0.1:57644收到客戶端發(fā)送的信息,內(nèi)容:Hello,我是客戶端
客戶端控制臺(tái)結(jié)果如下:
客戶端收到服務(wù)端:/127.0.0.1:8080,返回的信息:server send
從編程上可以看到,NIO 的操作比傳統(tǒng)的 IO 操作要復(fù)雜的多!
Selector被稱為選擇器,當(dāng)然你也可以翻譯為多路復(fù)用器。它是Java NIO 核心組件中的一個(gè),用于檢查一個(gè)或多個(gè)Channel(通道)的狀態(tài)是否處于連接就緒、接受就緒、可讀就緒、可寫就緒。
如此可以實(shí)現(xiàn)單線程管理多個(gè)channels的目的,也就是可以管理多個(gè)網(wǎng)絡(luò)連接。
使用 Selector 的好處在于:相比傳統(tǒng)方式使用多個(gè)線程來管理 IO,Selector 使用了更少的線程就可以處理通道了,并且實(shí)現(xiàn)網(wǎng)絡(luò)高效傳輸!
雖然 Java 中的 nio 傳輸比較快,為什么大家都不愿意用 JDK 原生 NIO 進(jìn)行開發(fā)呢?
從上面的代碼中大家都可以看出來,除了編程復(fù)雜之外,還有幾個(gè)讓人詬病的問題:
JDK 的 NIO 底層由 epoll 實(shí)現(xiàn),該實(shí)現(xiàn)飽受詬病的空輪詢 bug 會(huì)導(dǎo)致 cpu 飆升 100%!項(xiàng)目龐大之后,自行實(shí)現(xiàn)的 NIO 很容易出現(xiàn)各類 bug,維護(hù)成本較高!但是,Google 的 Netty 框架的出現(xiàn),很大程度上改善了 JDK 原生 NIO 所存在的一些讓人難以忍受的問題,關(guān)于 Netty 框架應(yīng)用,會(huì)在后期的文章里進(jìn)行介紹。
五、AIO最后就是 AIO 了,全稱 Asynchronous I/O,可以理解為異步 IO,也被稱為 NIO 2,在 Java 7 中引入,它是異步非阻塞的 IO 模型。
異步 IO 是基于事件回調(diào)機(jī)制實(shí)現(xiàn)的,也就是應(yīng)用操作之后會(huì)直接返回,不會(huì)堵塞在那里,當(dāng)后臺(tái)處理完成,操作系統(tǒng)會(huì)通知相應(yīng)的線程進(jìn)行后續(xù)的操作。
具體的實(shí)例如下!
服務(wù)端操作,樣例程序如下:
/** * aio 服務(wù)端 */public class AioServer { public AsynchronousServerSocketChannel serverChannel; /** * 監(jiān)聽客戶端請(qǐng)求 * @throws Exception */ public void listen() throws Exception { //打開一個(gè)服務(wù)端通道 serverChannel = AsynchronousServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(8080));//監(jiān)聽8080端口 //服務(wù)監(jiān)聽 serverChannel.accept(this, new CompletionHandler(){ @Override public void completed(AsynchronousSocketChannel client, AioServer attachment) { try { if (client.isOpen()) { System.out.println("接收到新的客戶端連接,地址:" + client.getRemoteAddress()); final ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取客戶端發(fā)送的信息 client.read(buffer, client, new CompletionHandler(){ @Override public void completed(Integer result, AsynchronousSocketChannel attachment) { try { //讀取請(qǐng)求,處理客戶端發(fā)送的數(shù)據(jù) buffer.flip(); String content = new String(buffer.array(), 0, buffer.limit()); System.out.println("服務(wù)端收到客戶端發(fā)送的信息:" + content); //向客戶端發(fā)送數(shù)據(jù) ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put("server send".getBytes()); writeBuffer.flip(); attachment.write(writeBuffer).get(); } catch (Exception e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { try { exc.printStackTrace(); attachment.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } catch (Exception e) { e.printStackTrace(); } finally { //當(dāng)有新客戶端接入的時(shí)候,直接調(diào)用accept方法,遞歸執(zhí)行下去,保證多個(gè)客戶端都可以阻塞 attachment.serverChannel.accept(attachment, this); } } @Override public void failed(Throwable exc, AioServer attachment) { exc.printStackTrace(); } }); } public static void main(String[] args) throws Exception { //啟動(dòng)服務(wù)器,并監(jiān)聽客戶端 new AioServer().listen(); //因?yàn)槭钱惒絀O執(zhí)行,讓主線程睡眠但不關(guān)閉 Thread.sleep(Integer.MAX_VALUE); }}
客戶端操作,樣例程序如下:
/** * aio 客戶端 */public class AioClient { public static void main(String[] args) throws IOException, InterruptedException { //打開一個(gè)客戶端通道 AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); //與服務(wù)器建立連接 channel.connect(new InetSocketAddress("127.0.0.1", 8080)); //睡眠1s,等待與服務(wù)器建立連接 Thread.sleep(1000); try { //向服務(wù)器發(fā)送數(shù)據(jù) channel.write(ByteBuffer.wrap("Hello,我是客戶端".getBytes())).get(); } catch (Exception e) { e.printStackTrace(); } try { //從服務(wù)器讀取數(shù)據(jù) ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer).get();//將通道中的數(shù)據(jù)寫入緩沖buffer byteBuffer.flip(); String result = new String(byteBuffer.array(), 0, byteBuffer.limit()); System.out.println("客戶端收到服務(wù)器返回的內(nèi)容:" + result);//輸出返回結(jié)果 } catch (Exception e) { e.printStackTrace(); } }}
同樣的,依次啟動(dòng)服務(wù)端程序,再啟動(dòng)客戶端程序,看看運(yùn)行結(jié)果!
服務(wù)端控制臺(tái)結(jié)果如下:
接收到新的客戶端連接,地址:/127.0.0.1:56606服務(wù)端收到客戶端發(fā)送的信息:Hello,我是客戶端
客戶端控制臺(tái)結(jié)果如下:
客戶端收到服務(wù)器返回的內(nèi)容:server send
這種組合方式用起來十分復(fù)雜,只有在一些非常復(fù)雜的分布式情況下使用,像集群之間的消息同步機(jī)制一般用這種 I/O 組合方式。如 Cassandra 的 Gossip 通信機(jī)制就是采用異步非阻塞的方式,可以實(shí)現(xiàn)非常高的網(wǎng)絡(luò)傳輸性能。
Netty 之前也嘗試使用過 AIO,不過又放棄了!
六、小結(jié)本文主要圍繞 BIO、NIO、AIO 等模型,結(jié)合一些樣例代碼,做了一次簡單的內(nèi)容知識(shí)總結(jié),希望對(duì)大家有所幫助。
內(nèi)容難免有所遺漏,歡迎留言指出!
七、參考1、JDK1.7&JDK1.8 源碼
2、IBM - 許令波 -深入分析 Java I/O 的工作機(jī)制
3、Github - JavaGuide - IO總結(jié)
4、博客園 - 五月的倉頡 - IO和File
關(guān)鍵詞:
一、簡介在計(jì)算機(jī)中,IO傳輸數(shù)據(jù)有三種工作方式,分別是:BIO、NIO、AI
人工智能目前已經(jīng)應(yīng)用在檢測(cè)惡意軟件、保護(hù)敏感數(shù)據(jù)、識(shí)別和應(yīng)對(duì)網(wǎng)絡(luò)威
小伙伴們知道,當(dāng)我們使用Spring容器的時(shí)候,如果遇到一些特殊的Bean,
昨天是2023年女足世界杯倒計(jì)時(shí)10天,中國女足隊(duì)員王霜當(dāng)日從美國飛抵澳
佛山日?qǐng)?bào)訊記者江樂詩攝影報(bào)道:暑假開始,如何讓孩子度過一段精彩充實(shí)
設(shè)計(jì)模式(Designpattern)代表了最佳的實(shí)踐,通常被有經(jīng)驗(yàn)的面向?qū)ο蟮能?/p>
一、線程池的實(shí)現(xiàn)原理下圖所示為線程池的實(shí)現(xiàn)原理:調(diào)用方不斷地向線程
本文經(jīng)AI新媒體量子位(公眾號(hào)ID:QbitAI)授權(quán)轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)聯(lián)系出處。
開放的網(wǎng)絡(luò)端口是網(wǎng)絡(luò)最簡單的接入點(diǎn)。很多時(shí)候,我們需要在從Internet
一、背景與行業(yè)現(xiàn)狀1、數(shù)據(jù)湖理解的幾個(gè)誤區(qū)現(xiàn)在很多企業(yè)都對(duì)數(shù)據(jù)湖存
門店內(nèi),店員正熱情地為市民介紹享受補(bǔ)貼的洗烘一體機(jī)的功能,講解當(dāng)前
分時(shí)圖快速拉升意味此時(shí)存在大單買入,在大單的推動(dòng)下,股價(jià)快速地上漲
7月10日晚間,白云山(600332)發(fā)布關(guān)于“王老吉”商標(biāo)法律糾紛訴訟結(jié)
7月11日浦城縣強(qiáng)美礦業(yè)有限公司螢石出廠價(jià)格暫穩(wěn),廠家報(bào)價(jià)3000-3100元
近日,韓國愛寶樂園為大熊貓福寶招聘“一日飼養(yǎng)員助理”的公告,吸引逾