四 分析easyswoole源码(启动服务&Cache组件原理) 2019-06-26

前文提到的在系统设置Cache组件 Cache::getInstance()的时候

Cache是以单例模式实现的。构造器会进行如下操作

//根据配置创建指定数目的Cache服务进程,然后启动。$num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默认配置数目是1,在Config.php里"EASY_CACHE.PROCESS_NUM"=>1if($num <= 0){ return;}$this->cliTemp = new SplArray();//这个数组以后会给单元测试时候单独使用,正常模式这个数组是不使用的//若是在主服务创建,而非单元测试调用if(ServerManager::getInstance()->getServer()){ //创建了一个swoole_table ,表名为__Cache,里面存储data(后面就讲到其实这里存储的是操作Cache的指令)作用是用来做GC(防止Cache被撑爆) TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[ "data"=>[ "type"=>Table::TYPE_STRING, "size"=>10*1024 ], "microTime"=>[ "type"=>Table::TYPE_STRING, "size"=>15 ] ],2048); $this->processNum = $num; for ($i=0;$i < $num;$i++){ ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class); }}

ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)这句话才是Cache的核心逻辑。

ProcessManager::getInstance()这句话主要做了下面的操作ProcessManager 的__construct构造函数创建了一个swoole_table,表名是process_hash_map

TableManager::getInstance()->add( "process_hash_map",[ "pid"=>[ "type"=>Table::TYPE_INT, "size"=>10 ] ],256);

addProcess($this->generateProcessName($i),CacheProcess::class);$this->generateProcessName($i)这个代码很简单就是根据$i来设置进程名称addProcess 是在processList存储CacheProcess::class的实例,具体代码如下

$key = md5($processName);if(!isset($this->processList[$key])){ try{ $process = new $processClass($processName,$args,$async); $this->processList[$key] = $process; return true; }catch (Throwable $throwable){ Trigger::throwable($throwable); return false; }}else{ trigger_error("you can not add the same name process : {$processName}.{$processClass}"); return false;}

那么CacheProcess::class的实例话做了什么操作呢$this->cacheData = new SplArray();//这里很关键,为什么这么说每个Cache进程实际保存的缓存值都是在这里的,每个Cache进程都有自己的一个cacheData数组$this->persistentTime = Config::getInstance()->getConf("EASY_CACHE.PERSISTENT_TIME");parent::__construct($processName, $args);CacheProcess::class继承于AbstractProcessAbstractProcess的构造方法

$this->async = $async;$this->args = $args;$this->processName = $processName;$this->swooleProcess = new swoole_process([$this,"__start"],false,2);ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服务会addProcess一个Cache的任务进程。

__start方法主要是给swoole_table,表名为process_hash_map插入当前CacheProcess的进程名为key,进程IDpid为value。并且注册进程退出的事件。

if(PHP_OS != "Darwin"){ $process->name($this->getProcessName());}TableManager::getInstance()->get("process_hash_map")->set( md5($this->processName),["pid"=>$this->swooleProcess->pid]);ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);if (extension_loaded("pcntl")) { pcntl_async_signals(true);}Process::signal(SIGTERM,function ()use($process){ $this->onShutDown(); TableManager::getInstance()->get("process_hash_map")->del(md5($this->processName)); swoole_event_del($process->pipe); $this->swooleProcess->exit(0);});if($this->async){ swoole_event_add($this->swooleProcess->pipe, function(){ $msg = $this->swooleProcess->read(64 * 1024); $this->onReceive($msg); });}$this->run($this->swooleProcess);

$this->run($this->swooleProcess)这个函数是CacheProcess如果配置了persistentTime,就会开启一个定时器定时去取$file = Config::getInstance()->getConf("TEMP_DIR")."/{$processName}.data";的数据备份,默认是0也就是不会去做定时数据落地的操作

看到这里才是Cache组件在第一次实例化的时候做的相关事情,总结就是创建了指定数量的Cache进程绑定到swoole服务器上。在全局的process_hash_map表中能找到对应的Cache进程ID。然后Cache进程是可以以管道方式来进行通信。

 

set缓存方法

public function set($key,$data){ if(!ServerManager::getInstance()->isStart()){ $this->cliTemp->set($key,$data); } if(ServerManager::getInstance()->getServer()){ $num = $this->keyToProcessNum($key); $msg = new Msg(); $msg->setCommand("set"); $msg->setArg("key",$key); $msg->setData($data); ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(swoole_serialize::pack($msg));//直接把需要缓存的数据,封装成msg然后write给hash映射到的Cache进程 }}

当进程获取到的时候会回调onReceive方法

public function onReceive(string $str,...$agrs){ // TODO: Implement onReceive() method. $msg = swoole_serialize::unpack($str); $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME); if(count($table) > 1900){ //接近阈值的时候进行gc检测 //遍历Table 依赖pcre 如果发现无法遍历table,检查机器是否安装pcre-devel //超过0.1s 基本上99.99%为无用数据。 $time = microtime(true); foreach ($table as $key => $item){ if(round($time - $item["microTime"]) > 0.1){ $table->del($key); } } } if($msg instanceof Msg){ switch ($msg->getCommand()){ case "set":{ $this->cacheData->set($msg->getArg("key"),$msg->getData()); break; } case "get":{ $ret = $this->cacheData->get($msg->getArg("key")); $msg->setData($ret); $table->set($msg->getToken(),[ "data"=>swoole_serialize::pack($msg), "microTime"=>microtime(true) ]); break; } case "del":{ $this->cacheData->delete($msg->getArg("key")); break; } case "flush":{ $this->cacheData->flush(); break; } case "enQueue":{ $que = $this->cacheData->get($msg->getArg("key")); if(!$que instanceof SplQueue){ $que = new SplQueue(); $this->cacheData->set($msg->getArg("key"),$que); } $que->enqueue($msg->getData()); break; } case "deQueue":{ $que = $this->cacheData->get($msg->getArg("key")); if(!$que instanceof SplQueue){ $que = new SplQueue(); $this->cacheData->set($msg->getArg("key"),$que); } $ret = null; if(!$que->isEmpty()){ $ret = $que->dequeue(); } $msg->setData($ret); //deQueue 有cli 服务未启动的请求,但无token if(!empty($msg->getToken())){ $table->set($msg->getToken(),[ "data"=>swoole_serialize::pack($msg), "microTime"=>microtime(true) ]); } break; } case "queueSize":{ $que = $this->cacheData->get($msg->getArg("key")); if(!$que instanceof SplQueue){ $que = new SplQueue(); } $msg->setData($que->count()); $table->set($msg->getToken(),[ "data"=>swoole_serialize::pack($msg), "microTime"=>microtime(true) ]); break; } } }}

这里一开始会进行缓存GC确保内存不会撑爆

set方法会直接给$this->cacheData,设置缓存值。

 

get方法比较特殊,它会去给Cache进程发送get的命令,然后Cache读取到命令会将值写到_Cache,Swoole_table表中。然后再去读取(这个会有一个while循环,类似自旋)出缓存内容。这样的好处,可以确保可以读取到当时的数据缓存,不会因为高并发读取到最新的缓存值内容。而且还能更有效的做gc,防止Cache内存撑爆。

public function get($key,$timeOut = 0.01){ if(!ServerManager::getInstance()->isStart()){ return $this->cliTemp->get($key); } $num = $this->keyToProcessNum($key); $token = Random::randStr(9);//这个是一个凭证,是确保获取到自己此刻想获取的cache数据,和事务类似为了保证可重复读 $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num)); $msg = new Msg(); $msg->setArg("timeOut",$timeOut); $msg->setArg("key",$key); $msg->setCommand("get"); $msg->setToken($token); $process->getProcess()->write(swoole_serialize::pack($msg)); return $this->read($token,$timeOut);}

$process->getProcess()->write(swoole_serialize::pack($msg))发这个包给Cache进程,Cache进程会进行下面这些操作

$ret = $this->cacheData->get($msg->getArg("key"));//获取到当前的缓存值$msg->setData($ret);//将当前的内容设置到_Cache表中,token是请求的时候发过来的凭证原样拼装。这有什么好处呢,就是确保在高并发下,在A时刻获取的缓存,不会拿到后面B时刻更新的值。$table->set($msg->getToken(),[ "data"=>swoole_serialize::pack($msg), "microTime"=>microtime(true)]);$this->read($token,$timeOut);

//这里的操作是直接从_Cache表中获取缓存数据,如果缓存存在并且进程调度没有超时,然后在表中将取过数据的内容删除掉返回private function read($token,$timeOut){ $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME); $start = microtime(true); $data = null; while(true){ usleep(1); if($table->exist($token)){ $data = $table->get($token)["data"]; $data = swoole_serialize::unpack($data); if(!$data instanceof Msg){ $data = null; } break; } if(round($start - microtime(true),3) > $timeOut){ break; } } $table->del($token); if($data){ return $data->getData(); }else{ return null; }}

 

Copyright © 2019 乐投letou787 All Rights Reserved
罗远谦
地址:桥下镇方岙村
全国统一热线:15162838535