MediaWiki
master
|
00001 <?php 00038 class MemcLockManager extends QuorumLockManager { 00040 protected $lockTypeMap = array( 00041 self::LOCK_SH => self::LOCK_SH, 00042 self::LOCK_UW => self::LOCK_SH, 00043 self::LOCK_EX => self::LOCK_EX 00044 ); 00045 00047 protected $bagOStuffs = array(); 00049 protected $serversUp = array(); // (server name => bool) 00050 00051 protected $lockExpiry; // integer; maximum time locks can be held 00052 protected $session = ''; // string; random SHA-1 UUID 00053 protected $wikiId = ''; // string 00054 00069 public function __construct( array $config ) { 00070 parent::__construct( $config ); 00071 00072 // Sanitize srvsByBucket config to prevent PHP errors 00073 $this->srvsByBucket = array_filter( $config['srvsByBucket'], 'is_array' ); 00074 $this->srvsByBucket = array_values( $this->srvsByBucket ); // consecutive 00075 00076 $memcConfig = isset( $config['memcConfig'] ) 00077 ? $config['memcConfig'] 00078 : array( 'class' => 'MemcachedPhpBagOStuff' ); 00079 00080 foreach ( $config['lockServers'] as $name => $address ) { 00081 $params = array( 'servers' => array( $address ) ) + $memcConfig; 00082 $cache = ObjectCache::newFromParams( $params ); 00083 if ( $cache instanceof MemcachedBagOStuff ) { 00084 $this->bagOStuffs[$name] = $cache; 00085 } else { 00086 throw new MWException( 00087 'Only MemcachedBagOStuff classes are supported by MemcLockManager.' ); 00088 } 00089 } 00090 00091 $this->wikiId = isset( $config['wikiId'] ) ? $config['wikiId'] : wfWikiID(); 00092 00093 $met = ini_get( 'max_execution_time' ); // this is 0 in CLI mode 00094 $this->lockExpiry = $met ? 2*(int)$met : 2*3600; 00095 00096 $this->session = wfRandomString( 32 ); 00097 } 00098 00103 protected function getLocksOnServer( $lockSrv, array $paths, $type ) { 00104 $status = Status::newGood(); 00105 00106 $memc = $this->getCache( $lockSrv ); 00107 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00108 00109 // Lock all of the active lock record keys... 00110 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00111 foreach ( $paths as $path ) { 00112 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00113 } 00114 return; 00115 } 00116 00117 // Fetch all the existing lock records... 00118 $lockRecords = $memc->getMulti( $keys ); 00119 00120 $now = time(); 00121 // Check if the requested locks conflict with existing ones... 00122 foreach ( $paths as $path ) { 00123 $locksKey = $this->recordKeyForPath( $path ); 00124 $locksHeld = isset( $lockRecords[$locksKey] ) 00125 ? $lockRecords[$locksKey] 00126 : array( self::LOCK_SH => array(), self::LOCK_EX => array() ); // init 00127 foreach ( $locksHeld[self::LOCK_EX] as $session => $expiry ) { 00128 if ( $expiry < $now ) { // stale? 00129 unset( $locksHeld[self::LOCK_EX][$session] ); 00130 } elseif ( $session !== $this->session ) { 00131 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00132 } 00133 } 00134 if ( $type === self::LOCK_EX ) { 00135 foreach ( $locksHeld[self::LOCK_SH] as $session => $expiry ) { 00136 if ( $expiry < $now ) { // stale? 00137 unset( $locksHeld[self::LOCK_SH][$session] ); 00138 } elseif ( $session !== $this->session ) { 00139 $status->fatal( 'lockmanager-fail-acquirelock', $path ); 00140 } 00141 } 00142 } 00143 if ( $status->isOK() ) { 00144 // Register the session in the lock record array 00145 $locksHeld[$type][$this->session] = $now + $this->lockExpiry; 00146 // We will update this record if none of the other locks conflict 00147 $lockRecords[$locksKey] = $locksHeld; 00148 } 00149 } 00150 00151 // If there were no lock conflicts, update all the lock records... 00152 if ( $status->isOK() ) { 00153 foreach ( $lockRecords as $locksKey => $locksHeld ) { 00154 $memc->set( $locksKey, $locksHeld ); 00155 wfDebug( __METHOD__ . ": acquired lock on key $locksKey.\n" ); 00156 } 00157 } 00158 00159 // Unlock all of the active lock record keys... 00160 $this->releaseMutexes( $memc, $keys ); 00161 00162 return $status; 00163 } 00164 00169 protected function freeLocksOnServer( $lockSrv, array $paths, $type ) { 00170 $status = Status::newGood(); 00171 00172 $memc = $this->getCache( $lockSrv ); 00173 $keys = array_map( array( $this, 'recordKeyForPath' ), $paths ); // lock records 00174 00175 // Lock all of the active lock record keys... 00176 if ( !$this->acquireMutexes( $memc, $keys ) ) { 00177 foreach ( $paths as $path ) { 00178 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00179 } 00180 return; 00181 } 00182 00183 // Fetch all the existing lock records... 00184 $lockRecords = $memc->getMulti( $keys ); 00185 00186 // Remove the requested locks from all records... 00187 foreach ( $paths as $path ) { 00188 $locksKey = $this->recordKeyForPath( $path ); // lock record 00189 if ( !isset( $lockRecords[$locksKey] ) ) { 00190 continue; // nothing to do 00191 } 00192 $locksHeld = $lockRecords[$locksKey]; 00193 if ( is_array( $locksHeld ) && isset( $locksHeld[$type] ) ) { 00194 unset( $locksHeld[$type][$this->session] ); 00195 $ok = $memc->set( $locksKey, $locksHeld ); 00196 } else { 00197 $ok = true; 00198 } 00199 if ( !$ok ) { 00200 $status->fatal( 'lockmanager-fail-releaselock', $path ); 00201 } 00202 wfDebug( __METHOD__ . ": released lock on key $locksKey.\n" ); 00203 } 00204 00205 // Unlock all of the active lock record keys... 00206 $this->releaseMutexes( $memc, $keys ); 00207 00208 return $status; 00209 } 00210 00215 protected function releaseAllLocks() { 00216 return Status::newGood(); // not supported 00217 } 00218 00223 protected function isServerUp( $lockSrv ) { 00224 return (bool)$this->getCache( $lockSrv ); 00225 } 00226 00233 protected function getCache( $lockSrv ) { 00234 $memc = null; 00235 if ( isset( $this->bagOStuffs[$lockSrv] ) ) { 00236 $memc = $this->bagOStuffs[$lockSrv]; 00237 if ( !isset( $this->serversUp[$lockSrv] ) ) { 00238 $this->serversUp[$lockSrv] = $memc->set( 'MemcLockManager:ping', 1, 1 ); 00239 if ( !$this->serversUp[$lockSrv] ) { 00240 trigger_error( __METHOD__ . ": Could not contact $lockSrv.", E_USER_WARNING ); 00241 } 00242 } 00243 if ( !$this->serversUp[$lockSrv] ) { 00244 return null; // server appears to be down 00245 } 00246 } 00247 return $memc; 00248 } 00249 00254 protected function recordKeyForPath( $path ) { 00255 $hash = LockManager::sha1Base36( $path ); 00256 list( $db, $prefix ) = wfSplitWikiID( $this->wikiId ); 00257 return wfForeignMemcKey( $db, $prefix, __CLASS__, 'locks', $hash ); 00258 } 00259 00265 protected function acquireMutexes( MemcachedBagOStuff $memc, array $keys ) { 00266 $lockedKeys = array(); 00267 00268 // Acquire the keys in lexicographical order, to avoid deadlock problems. 00269 // If P1 is waiting to acquire a key P2 has, P2 can't also be waiting for a key P1 has. 00270 sort( $keys ); 00271 00272 // Try to quickly loop to acquire the keys, but back off after a few rounds. 00273 // This reduces memcached spam, especially in the rare case where a server acquires 00274 // some lock keys and dies without releasing them. Lock keys expire after a few minutes. 00275 $rounds = 0; 00276 $start = microtime( true ); 00277 do { 00278 if ( ( ++$rounds % 4 ) == 0 ) { 00279 usleep( 1000*50 ); // 50 ms 00280 } 00281 foreach ( array_diff( $keys, $lockedKeys ) as $key ) { 00282 if ( $memc->add( "$key:mutex", 1, 180 ) ) { // lock record 00283 $lockedKeys[] = $key; 00284 } else { 00285 continue; // acquire in order 00286 } 00287 } 00288 } while ( count( $lockedKeys ) < count( $keys ) && ( microtime( true ) - $start ) <= 6 ); 00289 00290 if ( count( $lockedKeys ) != count( $keys ) ) { 00291 $this->releaseMutexes( $lockedKeys ); // failed; release what was locked 00292 return false; 00293 } 00294 00295 return true; 00296 } 00297 00303 protected function releaseMutexes( MemcachedBagOStuff $memc, array $keys ) { 00304 foreach ( $keys as $key ) { 00305 $memc->delete( "$key:mutex" ); 00306 } 00307 } 00308 00312 function __destruct() { 00313 while ( count( $this->locksHeld ) ) { 00314 foreach ( $this->locksHeld as $path => $locks ) { 00315 $this->doUnlock( array( $path ), self::LOCK_EX ); 00316 $this->doUnlock( array( $path ), self::LOCK_SH ); 00317 } 00318 } 00319 } 00320 }