protocol = Frame::class; } $this->ip = $ip; $worker->count = 1; $worker->name = 'ChannelServer'; $worker->channels = array(); $worker->onMessage = array($this, 'onMessage') ; $worker->onClose = array($this, 'onClose'); $this->_worker = $worker; } /** * onClose * @return void */ public function onClose($connection) { if (!empty($connection->channels)) { foreach ($connection->channels as $channel) { unset($this->_worker->channels[$channel][$connection->id]); if (empty($this->_worker->channels[$channel])) { unset($this->_worker->channels[$channel]); } } } if (!empty($connection->watchs)) { foreach ($connection->watchs as $channel) { if (isset($this->_queues[$channel])) { $this->_queues[$channel]->removeWatch($connection); if ($this->_queues[$channel]->isEmpty()) { unset($this->_queues[$channel]); } } } } } /** * onMessage. * @param \Workerman\Connection\TcpConnection $connection * @param string $data */ public function onMessage($connection, $data) { if(!$data) { return; } $worker = $this->_worker; $data = unserialize($data); $type = $data['type']; switch($type) { case 'subscribe': foreach($data['channels'] as $channel) { $connection->channels[$channel] = $channel; $worker->channels[$channel][$connection->id] = $connection; } break; case 'unsubscribe': foreach($data['channels'] as $channel) { if (isset($connection->channels[$channel])) { unset($connection->channels[$channel]); } if (isset($worker->channels[$channel][$connection->id])) { unset($worker->channels[$channel][$connection->id]); if (empty($worker->channels[$channel])) { unset($worker->channels[$channel]); } } } break; case 'publish': foreach ($data['channels'] as $channel) { if (empty($worker->channels[$channel])) { continue; } $buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']))."\n"; foreach ($worker->channels[$channel] as $connection) { $connection->send($buffer); } } break; case 'publishLoop': //choose one subscriber from the list foreach ($data['channels'] as $channel) { if (empty($worker->channels[$channel])) { continue; } $buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']))."\n"; //这是要点,每次取出一个元素,如果取不到,说明已经到最后,重置到第一个 $connection = next($worker->channels[$channel]); if( $connection == false ){ $connection = reset($worker->channels[$channel]); } $connection->send($buffer); } break; case 'watch': foreach ($data['channels'] as $channel) { $this->getQueue($channel)->addWatch($connection); } break; case 'unwatch': foreach ($data['channels'] as $channel) { if (isset($this->_queues[$channel])) { $this->_queues[$channel]->removeWatch($connection); if ($this->_queues[$channel]->isEmpty()) { unset($this->_queues[$channel]); } } } break; case 'enqueue': foreach ($data['channels'] as $channel) { $this->getQueue($channel)->enqueue($data['data']); } break; case 'reserve': if (isset($connection->watchs)) { foreach ($connection->watchs as $channel) { if (isset($this->_queues[$channel])) { $this->_queues[$channel]->addConsumer($connection); } } } break; } } private function getQueue($channel) { if (isset($this->_queues[$channel])) { return $this->_queues[$channel]; } return ($this->_queues[$channel] = new Queue($channel)); } } __halt_compiler();----SIGNATURE:----B/xM1lgP37PJMF4EO/kJSRNBOMBlzw6Qh+0M7HpcYkvQZSt1fR0fAVbIrXnmSvMFugoOwdEyMuJF2T/DkmaM4f82tJFemgQAZQeAYk4FSlhBJQUtQc5uOWKkrQw2cAnT2V0GnXKW3o/rdMADZ+IcmTFX69Mg4d2iqkTputSuMm8DdEJFUBDLZzKEVrxT3AnHcxFC0ZEWGGs6ncpOuLwKcQ6FzSaur8+DuWmckW8NTaZi2RNoqYju5dGbBHCBCGvCtvgBTBLk6BRGpiJO8ht5w2A0stwPh6woV7ibu947Bs505diMr5LYusZia6+2XRQwur3imJEuQ6t2JwMrZlMnXU0fpUIeOJ2vA3V8p7265S8kPV3e4OprFSxYUUNlY6snRMA1kG7Q6kdWHdsLGvrMi4+Tb/UpC8+GsL4slvUmMv+paDwZX9s0tHzpYC/70onF+kcbFcHZmu4ekScNTbcbk+pcbkWSt6BcOhrl5h++iJFb6fcjqsFu+LESNz2h2OfXpm8+HV267r9CNicQ4dLyPhCuFJ6Aoh1/JcvVgHVOOqd1KSBd2YcKio6ukSxF9zXuYtBICUgU3qq48gvWZKISZYWXAFa9p+eq9MWjYXaCZTPpA2fx+uYm0cbuj2X00BN1Kl5j3DPYtGl4Qs0Dz1zo+mQwzkMVZm/UJlBZIW2CD9I=----ATTACHMENT:----MzkwNjcxNjE5ODEwNzgxMSA2NTQxNTU3OTk1NTE2MDc2IDM0OTI5Mzg2NDEwMzU0MjI=