经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » PHP » 查看文章
Swoole MySQL 连接池的实现 - it-world
来源:cnblogs  作者:it-world  时间:2019/11/7 20:45:48  对本文有异议

开始今天的文章,这篇文章实现了 Swoole MySQL 连接池,代码是在《Swoole RPC 的实现》文章的基础上进行开发的。

先回顾上篇文章的内容:

    实现了 HTTP / TCP 请求

    实现了 同步 / 异步 请求

    分享了 OnRequest.php、OnReceive.php 源码

    业务逻辑 Order.php 中返回的是假数据

本篇文章主要的功能点:

    业务逻辑 Order.php 中返回 MySQL 数据库中的数据。

    Task 启用了协程

    支持 主/从 数据库配置

    实现数据库连接池

    实现数据库 CURD

代码

  1. Order.php
  2.  
  3. <?php
  4. if (!defined('SERVER_PATH')) exit("No Access");
  5. class Order
  6. {
  7. public function get_list($uid = 0, $type = 0)
  8. {
  9. //TODO 业务代码
  10. $rs[0]['order_code'] = '1';
  11. $rs[0]['order_name'] = '订单1';
  12. $rs[1]['order_code'] = '2';
  13. $rs[1]['order_name'] = '订单2';
  14. $rs[2]['order_code'] = '3';
  15. $rs[2]['order_name'] = '订单3';
  16. return $rs;
  17. }
  18. }
  19. 修改成:
  20. class Order
  21. {
  22. private $mysql;
  23. private $table;
  24. public function __construct()
  25. {
  26. $pool = MysqlPool::getInstance();
  27. $this->mysql = $pool->get();
  28. $this->table = 'order';
  29. }
  30. public function add($code = '', $name = '')
  31. {
  32. //TODO 验证
  33. return $this->mysql->insert($this->table, ['code' => $code, 'name' => $name]);
  34. }
  35. public function edit($id = 0, $name='')
  36. {
  37. //TODO 验证
  38. return $this->mysql->update($this->table, ['name' => $name], ['id' => $id]);
  39. }
  40. public function del($id = 0)
  41. {
  42. //TODO 验证
  43. return $this->mysql->delete($this->table, ['id' => $id]);
  44. }
  45. public function info($code = '')
  46. {
  47. //TODO 验证
  48. return $this->mysql->select($this->table, ['code' => $code]);
  49. }
  50. }

 



Task 启用协程

一、需要新增两项配置:

  1. enable_coroutine = true
  2. task_enable_coroutine = true

 



二、回调参数发生改变

 

  1. $serv->on('Task', function ($serv, $task_id, $src_worker_id, $data) {
  2. ...
  3. });
  4. 修改成:
  5. $serv->on('Task', function ($serv, $task) {
  6. $task->worker_id; //来自哪个`Worker`进程
  7. $task->id; //任务的编号
  8. $task->data; //任务的数据
  9. });

 



数据库 主/从 配置

  1. Mysql.php
  2.  
  3. <?php
  4. if (!defined('SERVER_PATH')) exit("No Access");
  5. $db['default']['pool_size'] = 3; //连接池个数
  6. $db['default']['pool_get_timeout'] = 0.5; //获取连接池超时时间
  7. $db['default']['timeout'] = 0.5; //数据库建立连接超时时间
  8. $db['default']['charset'] = 'utf8'; //字符集
  9. $db['default']['strict_type'] = false; //开启严格模式
  10. $db['default']['fetch_mode'] = true; //开启fetch模式
  11. $config['master'] = $db['default'];
  12. $config['master']['host'] = '127.0.0.1';
  13. $config['master']['port'] = 3306;
  14. $config['master']['user'] = 'root';
  15. $config['master']['password'] = '123456';
  16. $config['master']['database'] = 'demo';
  17. $config['slave'] = $db['default'];
  18. $config['slave']['host'] = '127.0.0.1';
  19. $config['slave']['port'] = 3306;
  20. $config['slave']['user'] = 'root';
  21. $config['slave']['password'] = '123456';
  22. $config['slave']['database'] = 'demo';

 



数据库连接池

  1. MysqlPool.php
  2.  
  3. <?php
  4. if (!defined('SERVER_PATH')) exit("No Access");
  5. class MysqlPool
  6. {
  7. private static $instance;
  8. private $pool;
  9. private $config;
  10. public static function getInstance($config = null)
  11. {
  12. if (empty(self::$instance)) {
  13. if (empty($config)) {
  14. throw new RuntimeException("MySQL config empty");
  15. }
  16. self::$instance = new static($config);
  17. }
  18. return self::$instance;
  19. }
  20. public function __construct($config)
  21. {
  22. if (empty($this->pool)) {
  23. $this->config = $config;
  24. $this->pool = new chan($config['master']['pool_size']);
  25. for ($i = 0; $i < $config['master']['pool_size']; $i++) {
  26. go(function() use ($config) {
  27. $mysql = new MysqlDB();
  28. $res = $mysql->connect($config);
  29. if ($res === false) {
  30. throw new RuntimeException("Failed to connect mysql server");
  31. } else {
  32. $this->pool->push($mysql);
  33. }
  34. });
  35. }
  36. }
  37. }
  38. public function get()
  39. {
  40. if ($this->pool->length() > 0) {
  41. $mysql = $this->pool->pop($this->config['master']['pool_get_timeout']);
  42. if (false === $mysql) {
  43. throw new RuntimeException("Pop mysql timeout");
  44. }
  45. defer(function () use ($mysql) { //释放
  46. $this->pool->push($mysql);
  47. });
  48. return $mysql;
  49. } else {
  50. throw new RuntimeException("Pool length <= 0");
  51. }
  52. }
  53. }

 



数据库 CURD

  1. MysqlDB.php
  2.  
  3. <?php
  4. if (!defined('SERVER_PATH')) exit("No Access");
  5. class MysqlDB
  6. {
  7. private $master;
  8. private $slave;
  9. private $config;
  10. public function __call($name, $arguments)
  11. {
  12. if ($name != 'query') {
  13. throw new RuntimeException($name.":This command is not supported");
  14. } else {
  15. return $this->_execute($arguments[0]);
  16. }
  17. }
  18. public function connect($config)
  19. {
  20. //主库
  21. $master = new Swoole\Coroutine\MySQL();
  22. $res = $master->connect($config['master']);
  23. if ($res === false) {
  24. throw new RuntimeException($master->connect_error, $master->errno);
  25. } else {
  26. $this->master = $master;
  27. }
  28. //从库
  29. $slave = new Swoole\Coroutine\MySQL();
  30. $res = $slave->connect($config['slave']);
  31. if ($res === false) {
  32. throw new RuntimeException($slave->connect_error, $slave->errno);
  33. } else {
  34. $this->slave = $slave;
  35. }
  36. $this->config = $config;
  37. return $res;
  38. }
  39. public function insert($table = '', $data = [])
  40. {
  41. $fields = '';
  42. $values = '';
  43. $keys = array_keys($data);
  44. foreach ($keys as $k) {
  45. $fields .= "`".addslashes($k)."`, ";
  46. $values .= "'".addslashes($data[$k])."', ";
  47. }
  48. $fields = substr($fields, 0, -2);
  49. $values = substr($values, 0, -2);
  50. $sql = "INSERT INTO `{$table}` ({$fields}) VALUES ({$values})";
  51. return $this->_execute($sql);
  52. }
  53. public function update($table = '', $set = [], $where = [])
  54. {
  55. $arr_set = [];
  56. foreach ($set as $k => $v) {
  57. $arr_set[] = '`'.$k . '` = ' . $this->_escape($v);
  58. }
  59. $set = implode(', ', $arr_set);
  60. $where = $this->_where($where);
  61. $sql = "UPDATE `{$table}` SET {$set} {$where}";
  62. return $this->_execute($sql);
  63. }
  64. public function delete($table = '', $where = [])
  65. {
  66. $where = $this->_where($where);
  67. $sql = "DELETE FROM `{$table}` {$where}";
  68. return $this->_execute($sql);
  69. }
  70. public function select($table = '',$where = [])
  71. {
  72. $where = $this->_where($where);
  73. $sql = "SELECT * FROM `{$table}` {$where}";
  74. return $this->_execute($sql);
  75. }
  76. private function _where($where = [])
  77. {
  78. $str_where = '';
  79. foreach ($where as $k => $v) {
  80. $str_where .= " AND `{$k}` = ".$this->_escape($v);
  81. }
  82. return "WHERE 1 ".$str_where;
  83. }
  84. private function _escape($str)
  85. {
  86. if (is_string($str)) {
  87. $str = "'".$str."'";
  88. } elseif (is_bool($str)) {
  89. $str = ($str === FALSE) ? 0 : 1;
  90. } elseif (is_null($str)) {
  91. $str = 'NULL';
  92. }
  93. return $str;
  94. }
  95. private function _execute($sql)
  96. {
  97. if (strtolower(substr($sql, 0, 6)) == 'select') {
  98. $db = $this->_get_usable_db('slave');
  99. } else {
  100. $db = $this->_get_usable_db('master');
  101. }
  102. $result = $db->query($sql);
  103. if ($result === true) {
  104. return [
  105. 'affected_rows' => $db->affected_rows,
  106. 'insert_id' => $db->insert_id,
  107. ];
  108. }
  109. return $result;
  110. }
  111. private function _get_usable_db($type)
  112. {
  113. if ($type == 'master') {
  114. if (!$this->master->connected) {
  115. $master = new Swoole\Coroutine\MySQL();
  116. $res = $master->connect($this->config['master']);
  117. if ($res === false) {
  118. throw new RuntimeException($master->connect_error, $master->errno);
  119. } else {
  120. $this->master = $master;
  121. }
  122. }
  123. return $this->master;
  124. } elseif ($type == 'slave') {
  125. if (!$this->slave->connected) {
  126. $slave = new Swoole\Coroutine\MySQL();
  127. $res = $slave->connect($this->config['slave']);
  128. if ($res === false) {
  129. throw new RuntimeException($slave->connect_error, $slave->errno);
  130. } else {
  131. $this->slave = $slave;
  132. }
  133. }
  134. return $this->slave;
  135. }
  136. }
  137. }

 



OnWorkerStart 中调用

 

  1. try {
  2. MysqlPool::getInstance(get_config('mysql'));
  3. } catch (\Exception $e) {
  4. $serv->shutdown();
  5. } catch (\Throwable $throwable) {
  6. $serv->shutdown();
  7. }

 


客户端发送请求

  

  1. <?php
  2. //新增
  3. $demo = [
  4. 'type' => 'SW',
  5. 'token' => 'Bb1R3YLipbkTp5p0',
  6. 'param' => [
  7. 'class' => 'Order',
  8. 'method' => 'add',
  9. 'param' => [
  10. 'code' => 'C'.mt_rand(1000,9999),
  11. 'name' => '订单-'.mt_rand(1000,9999),
  12. ],
  13. ],
  14. ];
  15. //编辑
  16. $demo = [
  17. 'type' => 'SW',
  18. 'token' => 'Bb1R3YLipbkTp5p0',
  19. 'param' => [
  20. 'class' => 'Order',
  21. 'method' => 'edit',
  22. 'param' => [
  23. 'id' => '4',
  24. 'name' => '订单-'.mt_rand(1000,9999),
  25. ],
  26. ],
  27. ];
  28. //删除
  29. $demo = [
  30. 'type' => 'SW',
  31. 'token' => 'Bb1R3YLipbkTp5p0',
  32. 'param' => [
  33. 'class' => 'Order',
  34. 'method' => 'del',
  35. 'param' => [
  36. 'id' => '1',
  37. ],
  38. ],
  39. ];
  40. //查询
  41. $demo = [
  42. 'type' => 'SW',
  43. 'token' => 'Bb1R3YLipbkTp5p0',
  44. 'param' => [
  45. 'class' => 'Order',
  46. 'method' => 'info',
  47. 'param' => [
  48. 'code' => 'C4649'
  49. ],
  50. ],
  51. ];
  52. $ch = curl_init();
  53. $options = [
  54. CURLOPT_URL => 'http://10.211.55.4:9509/',
  55. CURLOPT_POST => 1,
  56. CURLOPT_POSTFIELDS => json_encode($demo),
  57. ];
  58. curl_setopt_array($ch, $options);
  59. curl_exec($ch);
  60. curl_close($ch);

 

原文链接:http://www.cnblogs.com/it-3327/p/11811917.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号