站長資訊網
最全最豐富的資訊網站

一文聊聊Node中的進程間通信

進程間怎么進行通信?下面本篇文章給大家介紹一下Node進程間通信的原理,希望對大家有所幫助!

一文聊聊Node中的進程間通信

前端(vue)入門到精通課程:進入學習

前置知識

文件描述符

在 Linux 系統中,一切都看成文件,當進程打開現有文件時,會返回一個文件描述符。 文件描述符是操作系統為了管理已經被進程打開的文件所創建的索引,用來指向被打開的文件。 當我們的進程啟動之后,操作系統會給每一個進程分配一個 PCB 控制塊,PCB 中會有一個文件描述符表,存放當前進程所有的文件描述符,即當前進程打開的所有文件。

? 進程中的文件描述符是如何和系統文件對應起來的? 在內核中,系統會維護另外兩種表

  • 打開文件表(Open file table)
  • i-node 表(i-node table)

文件描述符就是數組的下標,從0開始往上遞增,0/1/2 默認是我們的輸入/輸出/錯誤流的文件描述符 在 PCB 中維護的文件描述表中,可以根據文件描述符找到對應了文件指針,找到對應的打開文件表 打開文件表中維護了:文件偏移量(讀寫文件的時候會更新);對于文件的狀態標識;指向 i-node 表的指針 想要真正的操作文件,還得靠 i-node 表,能夠獲取到真實文件的相關信息

他們之間的關系

一文聊聊Node中的進程間通信

圖解

  • 在進程 A 中,文件描述符1/20均指向了同一打開文件表項23,這可能是對同一文件多次調用了 open 函數形成的
  • 進程 A/B 的文件描述符2都指向同一文件,這可能是調用了 fork 創建子進程,A/B 是父子關系進程
  • 進程 A 的文件描述符0和進程 B 的文件描述符指向了不同的打開文件表項,但這些表項指向了同一個文件,這可能是 A/B 進程分別對同一文件發起了 open 調用

總結

  • 同一進程的不同文件描述符可以指向同一個文件
  • 不同進程可以擁有相同的文件描述符
  • 不同進程的同一文件描述符可以指向不同的文件
  • 不同進程的不同文件描述符可以指向同一個文件

文件描述符的重定向

每次讀寫進程的時候,都是從文件描述符下手,找到對應的打開文件表項,再找到對應的 i-node 表

?如何實現文件描述符重定向? 因為在文件描述符表中,能夠找到對應的文件指針,如果我們改變了文件指針,是不是后續的兩個表內容就發生了改變 例如:文件描述符1指向的顯示器,那么將文件描述符1指向 log.txt 文件,那么文件描述符 1 也就和 log.txt 對應起來了

shell 對文件描述符的重定向

> 是輸出重定向符號,< 是輸入重定向符號,它們是文件描述符操作符 > 和 < 通過修改文件描述符改變了文件指針的指向,來能夠實現重定向的功能

我們使用cat hello.txt時,默認會將結果輸出到顯示器上,使用 > 來重定向。cat hello.txt 1 > log.txt以輸出的方式打開文件 log.txt,并綁定到文件描述符1上

一文聊聊Node中的進程間通信

c函數對文件描述符的重定向

dup

dup 函數是用來打開一個新的文件描述符,指向和 oldfd 同一個文件,共享文件偏移量和文件狀態

int main(int argc, char const *argv[]) {     int fd = open("log.txt");     int copyFd = dup(fd);     //將fd閱讀文件置于文件末尾,計算偏移量。     cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl;     //現在我們計算copyFd的偏移量     cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl;     return 0; }

一文聊聊Node中的進程間通信

調用 dup(3) 的時候,會打開新的最小描述符,也就是4,這個4指向了3所指向的文件,操作任意一個 fd 都是修改的一個文件

dup2

dup2 函數,把指定的 newfd 也指向 oldfd 指向的文件。執行完dup2之后,newfd 和 oldfd 同時指向同一個文件,共享文件偏移量和文件狀態

int main(int argc, char const *argv[]) {     int fd = open("log.txt");     int copyFd = dup(fd);     //將fd閱讀文件置于文件末尾,計算偏移量。     cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl;     //現在我們計算copyFd的偏移量     cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl;     return 0; }

一文聊聊Node中的進程間通信

Node中通信原理

Node 中的 IPC 通道具體實現是由 libuv 提供的。根據系統的不同實現方式不同,window 下采用命名管道實現,*nix 下采用 Domain Socket 實現。在應用層只體現為 message 事件和 send 方法。【相關教程推薦:nodejs視頻教程】

一文聊聊Node中的進程間通信

父進程在實際創建子進程之前,會創建 IPC 通道并監聽它,等到創建出真實的子進程后,通過環境變量(NODE_CHANNEL_FD)告訴子進程該 IPC 通道的文件描述符。

子進程在啟動的過程中,會根據該文件描述符去連接 IPC 通道,從而完成父子進程的連接。

建立連接之后可以自由的通信了,IPC 通道是使用命名管道或者 Domain Socket 創建的,屬于雙向通信。并且它是在系統內核中完成的進程通信

一文聊聊Node中的進程間通信

⚠️ 只有在啟動的子進程是 Node 進程時,子進程才會根據環境變量去連接對應的 IPC 通道,對于其他類型的子進程則無法實現進程間通信,除非其他進程也按著該約定去連接這個 IPC 通道。

unix domain socket

是什么

我們知道經典的通信方式是有 Socket,我們平時熟知的 Socket 是基于網絡協議的,用于兩個不同主機上的兩個進程通信,通信需要指定 IP/Host 等。 但如果我們同一臺主機上的兩個進程想要通信,如果使用 Socket 需要指定 IP/Host,經過網絡協議等,會顯得過于繁瑣。所以 Unix Domain Socket 誕生了。

UDS 的優勢:

  • 綁定 socket 文件而不是綁定 IP/Host;不需要經過網絡協議,而是數據的拷貝
  • 也支持 SOCK_STREAM(流套接字)和 SOCK_DGRAM(數據包套接字),但由于是在本機通過內核通信,不會丟包也不會出現發送包的次序和接收包的次序不一致的問題

如何實現

流程圖

一文聊聊Node中的進程間通信

Server 端
int main(int argc, char *argv[]) {     int server_fd ,ret, client_fd;     struct sockaddr_un serv, client;     socklen_t len = sizeof(client);     char buf[1024] = {0};     int recvlen;      // 創建 socket     server_fd = socket(AF_LOCAL, SOCK_STREAM, 0);      // 初始化 server 信息     serv.sun_family = AF_LOCAL;     strcpy(serv.sun_path, "server.sock");      // 綁定     ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv));      //設置監聽,設置能夠同時和服務端連接的客戶端數量     ret = listen(server_fd, 36);      //等待客戶端連接     client_fd = accept(server_fd, (struct sockaddr *)&client, &len);     printf("=====client bind file:%sn", client.sun_path);      while (1) {         recvlen = recv(client_fd, buf, sizeof(buf), 0);         if (recvlen == -1) {             perror("recv error");             return -1;         } else if (recvlen == 0) {             printf("client disconnet...n");             close(client_fd);             break;         } else {             printf("recv buf %sn", buf);             send(client_fd, buf, recvlen, 0);         }     }      close(client_fd);     close(server_fd);     return 0; }
Client 端
int main(int argc, char *argv[]) {     int client_fd ,ret;     struct sockaddr_un serv, client;     socklen_t len = sizeof(client);     char buf[1024] = {0};     int recvlen;      //創建socket     client_fd = socket(AF_LOCAL, SOCK_STREAM, 0);      //給客戶端綁定一個套接字文件     client.sun_family = AF_LOCAL;     strcpy(client.sun_path, "client.sock");     ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client));      //初始化server信息     serv.sun_family = AF_LOCAL;     strcpy(serv.sun_path, "server.sock");     //連接     connect(client_fd, (struct sockaddr *)&serv, sizeof(serv));      while (1) {         fgets(buf, sizeof(buf), stdin);         send(client_fd, buf, strlen(buf)+1, 0);          recv(client_fd, buf, sizeof(buf), 0);         printf("recv buf %sn", buf);     }      close(client_fd);     return 0; }

命名管道(Named Pipe)

是什么

命名管道是可以在同一臺計算機的不同進程之間,或者跨越一個網絡的不同計算機的不同進程之間的可靠的單向或者雙向的數據通信。 創建命名管道的進程被稱為管道服務端(Pipe Server),連接到這個管道的進程稱為管道客戶端(Pipe Client)。

命名管道的命名規范:serverpipe[path]name

  • 其中 server 指定一個服務器的名字,本機適用 . 表示,192.10.10.1 表示網絡上的服務器
  • pipe 是一個不可變化的字串,用于指定該文件屬于 NPFS(Named Pipe File System)
  • [path]name 是唯一命名管道名稱的標識

怎么實現

流程圖

一文聊聊Node中的進程間通信

Pipe Server
void ServerTest() {     HANDLE  serverNamePipe;     char    pipeName[MAX_PATH] = {0};     char    szReadBuf[MAX_BUFFER] = {0};     char    szWriteBuf[MAX_BUFFER] = {0};     DWORD   dwNumRead = 0;     DWORD   dwNumWrite = 0;      strcpy(pipeName, "\\.\pipe\shuangxuPipeTest");     // 創建管道實例     serverNamePipe = CreateNamedPipeA(pipeName,         PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH,         PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT,         PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL);     WriteLog("創建管道成功...");     // 等待客戶端連接     BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL );     WriteLog( "收到客戶端的連接成功...");     // 接收數據     memset( szReadBuf, 0, MAX_BUFFER );     bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL );     // 業務邏輯處理 (只為測試用返回原來的數據)     WriteLog( "收到客戶數據:[%s]", szReadBuf);     // 發送數據     if( !WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL ) )     {         WriteLog("向客戶寫入數據失敗:[%#x]", GetLastError());         return ;     }     WriteLog("寫入數據成功..."); }
Pipe Client
void ClientTest() {     char    pipeName[MAX_PATH] = {0};     HANDLE  clientNamePipe;     DWORD   dwRet;     char    szReadBuf[MAX_BUFFER] = {0};     char    szWriteBuf[MAX_BUFFER] = {0};     DWORD   dwNumRead = 0;     DWORD   dwNumWrite = 0;      strcpy(pipeName, "\\.\pipe\shuangxuPipeTest");     // 檢測管道是否可用     if(!WaitNamedPipeA(pipeName, 10000)){         WriteLog("管道[%s]無法打開", pipeName);         return ;     }     // 連接管道     clientNamePipe = CreateFileA(pipeName,         GENERIC_READ|GENERIC_WRITE,         0,         NULL,         OPEN_EXISTING,         FILE_ATTRIBUTE_NORMAL,         NULL);     WriteLog("管道連接成功...");     scanf( "%s", szWritebuf );     // 發送數據     if( !WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){         WriteLog("發送數據失敗,GetLastError=[%#x]", GetLastError());         return ;     }     printf("發送數據成功:%sn", szWritebuf );     // 接收數據     if( !ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){         WriteLog("接收數據失敗,GetLastError=[%#x]", GetLastError() );         return ;     }     WriteLog( "接收到服務器返回:%s", szReadBuf );     // 關閉管道     CloseHandle(clientNamePipe); }

Node 創建子進程的流程

Unix

一文聊聊Node中的進程間通信

對于創建子進程、創建管道、重定向管道均是在 c++ 層實現的

創建子進程

int main(int argc,char *argv[]){     pid_t pid = fork();     if (pid < 0) {         // 錯誤     } else if(pid == 0) {         // 子進程     } else {         // 父進程     } }

創建管道

使用 socketpair 創建管道,其創建出來的管道是全雙工的,返回的文件描述符中的任何一個都可讀和可寫

int main () {     int fd[2];     int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd);      if (fork()){ /* 父進程 */         int val = 0;         close(fd[1]);         while (1){             sleep(1);             ++val;             printf("發送數據: %dn", val);             write(fd[0], &val, sizeof(val));             read(fd[0], &val, sizeof(val));             printf("接收數據: %dn", val);         }     } else {  /*子進程*/         int val;         close(fd[0]);         while(1){             read(fd[1], &val, sizeof(val));             ++val;             write(fd[1], &val, sizeof(val));         }     } }

當我們使用 socketpair 創建了管道之后,父進程關閉了 fd[1],子進程關閉了 fd[0]。子進程可以通過 fd[1] 讀寫數據;同理主進程通過 fd[0]讀寫數據完成通信。

對應代碼:https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c#L344

child_process.fork 的詳細調用

fork 函數開啟一個子進程的流程

一文聊聊Node中的進程間通信

  • 初始化參數中的 options.stdio,并且調用 spawn 函數

    function spawn(file, args, options) {   const child = new ChildProcess();    child.spawn(options); }
  • 創建 ChildProcess 實例,創建子進程也是調用 C++ 層 this._handle.spawn 方法

    function ChildProcess() { 	// C++層定義 	this._handle = new Process(); }
  • 通過 child.spawn 調用到 ChildProcess.prototype.spawn 方法中。其中 getValidStdio 方法會根據 options.stdio 創建和 C++ 交互的 Pipe 對象,并獲得對應的文件描述符,將文件描述符寫入到環境變量 NODE_CHANNEL_FD 中,調用 C++ 層創建子進程,在調用 setupChannel 方法

    ChildProcess.prototype.spawn = function(options) {   // 預處理進程間通信的數據結構 	stdio = getValidStdio(stdio, false); 	const ipc = stdio.ipc;     const ipcFd = stdio.ipcFd; 	//將文件描述符寫入環境變量中 	if (ipc !== undefined) {     ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`);   } 	// 創建進程 	const err = this._handle.spawn(options);     // 添加send方法和監聽IPC中數據 	if (ipc !== undefined) setupChannel(this, ipc, serialization); }
  • 子進程啟動時,會根據環境變量中是否存在 NODE_CHANNEL_FD 判斷是否調用 _forkChild 方法,創建一個 Pipe 對象, 同時調用 open 方法打開對應的文件描述符,在調用setupChannel

    function _forkChild(fd, serializationMode) {   const p = new Pipe(PipeConstants.IPC);   p.open(fd);   p.unref();   const control = setupChannel(process, p, serializationMode); }

句柄傳遞

setupChannel 主要是完成了處理接收的消息、發送消息、處理文件描述符傳遞等

function setipChannel(){ 	channel.onread = function(arrayBuffer){ 		//... 	} 	target.on('internalMessage', function(message, handle){ 		//... 	}) 	target.send = function(message, handle, options, callback){ 		//... 	} 	target._send = function(message, handle, options, callback){ 		//... 	} 	function handleMessage(message, handle, internal){ 		//... 	} }
  • target.send: process.send 方法,這里 target 就是進程對象本身.
  • target._send: 執行具體 send 邏輯的函數, 當參數 handle 不存在時, 表示普通的消息傳遞;若存在,包裝為內部對象,表明是一個 internalMessage 事件觸發。調用使用JSON.stringify 序列化對象, 使用channel.writeUtf8String 寫入文件描述符中
  • channel.onread: 獲取到數據時觸發, 跟 channel.writeUtf8String 相對應。通過 JSON.parse 反序列化 message 之后, 調用 handleMessage 進而觸發對應事件
  • handleMessage: 用來判斷是觸發 message 事件還是 internalMessage 事件
  • target.on('internalMessage'): 針對內部對象做特殊處理,在調用 message 事件

一文聊聊Node中的進程間通信

進程間消息傳遞

  • 父進程通過 child.send 發送消息 和 server/socket 句柄對象

  • 普通消息直接 JSON.stringify 序列化;對于句柄對象來說,需要先包裝成為內部對象

    message = { 	cmd: 'NODE_HANDLE', 	type: null, 	msg: message };

    通過 handleConversion.[message.type].send 的方法取出句柄對象對應的 C++ 層面的 TCP 對象,在采用JSON.stringify 序列化

    const handleConversion = { 	'net.Server': {     simultaneousAccepts: true,      send(message, server, options) {       return server._handle;     },      got(message, handle, emit) {       const server = new net.Server();       server.listen(handle, () => {         emit(server);       });     }   } //.... }
  • 最后將序列化后的內部對象和 TCP 對象寫入到 IPC 通道中

  • 子進程在接收到消息之后,使用 JSON.parse 反序列化消息,如果為內部對象觸發 internalMessage 事件

  • 檢查是否帶有 TCP 對象,通過 handleConversion.[message.type].got 得到和父進程一樣的句柄對象

  • 最后發觸發 message 事件傳遞處理好的消息和句柄對象,子進程通過 process.on 接收

贊(0)
分享到: 更多 (0)
網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
国产在线精品香蕉麻豆| 久久精品一区二区国产| 国产精品资源一区二区| 国产午夜亚洲精品国产| 精品人妻中文字幕有码在线 | 久久久久成人精品免费播放动漫| 最新精品国偷自产在线| 亚洲国产午夜中文字幕精品黄网站| 中文字幕一区二区三区日韩精品| 国产麻豆剧传媒精品国产免费| 久久国产精品99国产精| 久久久这里只有精品加勒比| 精品国产乱子伦一区二区三区| 久久久久99精品成人片试看| 日韩一区精品视频一区二区| 久久国产乱子伦精品在| 久热re这里只有精品视频| 国产精品区一区二区三在线播放 | 国产在线91精品入口| 国产精品内射久久久久欢欢| 一区二区精品久久| www.99精品视频在线播放| 午夜亚洲av永久无码精品| 午夜精品久久久久久影视777| 日韩三级电影视频| 日韩毛片免费在线观看| 在线观看日韩一区| 老司机精品福利在线| 一色屋精品视频任你曰| 亚洲精品成人在线| 国产精品99精品视频网站| 国产精品gz久久久| 精品久人妻去按摩店被黑人按中出 | 黄床大片免费30分钟国产精品| 538国产精品一区二区在线| 国产精品男男视频一区二区三区| 久久99精品久久久久久野外| a级精品九九九大片免费看| 亚洲精品国产精品乱码视色| 日本精品一区二区三区在线观看| 久久国产精品国产精品|