我不生产代码
我只是代码的搬运工

PHP实现RPC服务

RPC英文名称Remote Procedure Call,翻译过来为远程过程调用。主要应用于不同编程语言、不同系统之间的远程通信和相互调用。

RPC具体如下优点:

  • 支持多种通信协议,如http、tcp

  • 支持同步调用和异步调用

  • 如同调用自身系统一样方便

下面以PHP为例简单实现RPC服务

整个过程如下:

  1. 服务端启动RPC进程,阻塞等待客户端连接

  2. 客户端通过TCP与服务端建立socket连接

  3. 客户端将要调用的类、方法、参数以json的格式传给服务端

  4. 服务端接收到参数后调用相应的类的方法,然后将结果以json格式返回给客户端

实现代码如下:

RpcServer.php

class RpcServer
{
    protected $params = [];
    protected $defaultHost = '0.0.0.0';
    protected $server;
    protected $successCode = 0;
    protected $errorCode = 1;
    protected $successMsg = 'success';
    protected $bufSize = 4096;
    public $errorNo = 0;
    public $errorStr = '';

    public function __construct($params)
    {
        $this->params = $params;
        $this->checkParams();
        $this->createServer();
    }

    /**
     * 检测参数
     * @throws Exception
     */
    protected function checkParams()
    {
        if (!isset($this->params['host']) || empty($this->params['host'])) {
            $this->params['host'] = $this->defaultHost;
        }

        if (!isset($this->params['port']) || empty($this->params['port'])) {
            throw new Exception("port is empty");
        }

        if (!isset($this->params['path']) || empty($this->params['path'])) {
            throw new Exception('params path is empty');
        } elseif (!is_dir($this->params['path'])) {
            throw new Exception("path is not a dir");
        }
    }

    protected function createServer()
    {
        $host = $this->params['host'];
        $port = $this->params['port'];
        $errorStr = '';
        $errorNo = 0;
        $this->server = stream_socket_server("tcp://{$host}:{$port}", $errorNo, $errorStr);
        if ($this->server === false) {
            throw new Exception('stream_socket_server error:'. $errorNo. "\t" . $errorStr);
        }

        echo "create server success...\n";
    }

    /**
     *
     */
    public function run()
    {
        while (true) {
            $client = @stream_socket_accept($this->server);
            if ($client) {
                echo "accept...\n";

                $buf = '';
                while (true) {
                    $res = stream_socket_recvfrom($client, $this->bufSize);
                    $buf .= $res;
                    if (strlen($res) < $this->bufSize) {
                        break;
                    }
                }
                echo "receive data: ".$buf."\n";

                $params = $this->parseParams($buf);
                if ($params === false) {
                    $result = [
                        'code' => $this->errorNo,
                        'msg' => $this->errorStr,
                    ];
                } else {
                    $execResult = $this->exec($params);
                    if ($execResult === false) {
                        $result = [
                            'code' => $this->errorNo,
                            'msg' => $this->errorStr,
                        ];
                    } else {
                        $result = [
                            'code' => $this->successCode,
                            'msg' => $this->successMsg,
                            'data' => $execResult,
                        ];
                    }
                }

                $result = json_encode($result, JSON_UNESCAPED_UNICODE);
                echo "return data: ".$result;
                fwrite($client, $result);
                fclose($client);
            }
        }
    }

    /**
     * 执行操作
     * @param $params
     * @return bool|mixed
     */
    protected function exec($params)
    {
        try {
            $className = isset($params['class']) ? $params['class'] : '';
            $method = isset($params['method']) ? $params['method'] : '';
            if (empty($className)) {
                $this->errorStr = 'class is empty';
                $this->errorNo = $this->errorCode;
                return false;
            } elseif (empty($method)) {
                $this->errorStr = 'method is empty';
                $this->errorNo = $this->errorCode;
                return false;
            }

            $class = $this->params['path'].$className.'.php';
            if (!file_exists($class)) {
                $this->errorStr = "file {$class} is not exist";
                $this->errorNo = $this->errorCode;
                return false;
            }
            include_once $class;

            if (!class_exists($className)) {
                $this->errorStr = "class {$className} is not exist";
                $this->errorNo = $this->errorCode;
                return false;
            }

            $obj = new ReflectionClass($className);
            if (!$obj->hasMethod($method)) {
                $this->errorNo = $this->errorCode;
                $this->errorStr = "{$className} is not has method {$method}";
                return false;
            }

            $methodObj = new \ReflectionMethod($className, $method);
            if (!$methodObj->isPublic()) {
                $this->errorStr = "method {$method} is not public";
                $this->errorNo = $this->errorCode;
                return false;
            }

            $instance = $obj->newInstance();
            return $methodObj->invokeArgs($instance, $params['params']);
        } catch (Exception $e) {
            echo $e->getFile()."\n";
            echo $e->getLine()."\n";
            echo $e->getMessage()."\n";
            $this->errorNo = $e->getCode();
            $this->errorStr = $e->getMessage();
        }
        return false;
    }

    /**
     * 解析参数
     * @param array $params
     * @return array|bool
     */
    protected function parseParams($params)
    {
        $result = ['method' => '', 'class' => '', 'params' => []];
        if (empty($result)) {
            $this->errorStr = "参数不合法";
            $this->errorNo = $this->errorCode;
            return false;
        }

        $data = json_decode($params, true);

        if (json_last_error()) {
            $this->errorStr = json_last_error_msg();
            $this->errorNo = json_last_error();
            return false;
        }

        if (empty($data) || !is_array($data)) {
            $this->errorStr = "参数为空或格式不合法";
            $this->errorNo = $this->errorCode;
            return false;
        }
        return $data;
    }
}

RpcClient.php

class RpcClient
{
    protected $params;
    protected $bufSize = 4096;

    /**
     * RpcClient constructor.
     * @param $params
     *  host rpc服务地址
     *  port rpc端口
     */
    public function __construct($params)
    {
        $this->params = $params;
    }

    /**
     * @return bool|resource
     * @throws Exception
     */
    protected function getClient()
    {
        $host = $this->params['host'];
        $port = $this->params['port'];
        $errorStr = '';
        $errorNo = 0;
        $client = stream_socket_client("tcp://{$host}:{$port}", $errorNo, $errorStr);
        if ($client === false) {
            throw new Exception('stream_socket_client error:'. $errorNo. "\t" . $errorStr);
        }
        return $client;
    }

    public function call($params)
    {
        try {
            $client = $this->getClient();

            $raw = json_encode($params, JSON_UNESCAPED_UNICODE);
            $raw = ($raw);
            fwrite($client, $raw);
            $result = '';
            while (true) {
                $buf = fread($client, $this->bufSize);
                $result .= $buf;
                if (strlen($buf) < $this->bufSize) {
                    break;
                }
            }
            fclose($client);
            return $result;
        } catch (Exception $e) {
            echo $e->getFile()."\n";
            echo $e->getLine()."\n";
            echo $e->getMessage()."\n";
        }
    }
}

测试:

在当前目录中建一个class目录用于存放RPC客户端调用的类,如

Hello.php

class Hello
{
    public function test($a, $b, $c) {
        return "a={$a},b={$b},c={$c}";
    }
}

在当前目录中分别建ServerTest.php和Client.php用于启动服务端与客户端进行测试:

ServerTest.php

include 'RpcServer.php';
include 'class/Hello.php';

$class = 'Hello';
$method = 'test';
$p = [
    'c' => 2,
    'b' => 1,
    'a' => 3,

];
$parmas = [
    'host' => '0.0.0.0',
    'port' => '1234',
    'path' => realpath(__DIR__)."/class/",
];

$server = new RpcServer($parmas);
$server->run();

ClientTest.php

$params = [
    'host' => '127.0.0.1',
    'port' => '1234',
    'path' => realpath(__DIR__)."/class/",
];

$client = new RpcClient($params);

$params = [
    'class' => 'Hello',
    'method' => 'test',
    'params' => ['b' => 1, 'a' => '2', 'c' => time()],
];
$res = $client->call($params);
print_r($res);

启动一个终端运行ServerTest.php,结果如下:

$ php ServerTest.php
create server success...

然后启动另一终端运行ClientTest.php

$ php ClientTest.php
{"code":0,"msg":"success","data":"a=1,b=2,c=1572361547"}

其中第二行中内容为RPC服务端返回的内容,同时看到第一个终端中有如下内容输出:

receive data: {"class":"Hello","method":"test","params":{"b":1,"a":"2","c":15723
61547}}
return data: {"code":0,"msg":"success","data":"a=1,b=2,c=1572361547"}

至于,客户端可正常调用RPC服务端并进行通信。

本文章为本站原创,如转载请注明文章出处:https://www.sviping.com/archives/40

分享到:
上一篇: Golang遍历目录下所有文件 下一篇: 二叉树算法的PHP实现
12