// ReplicaSetStatus.java /** * Copyright (C) 2008 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb; import com.mongodb.util.JSON; import org.bson.util.annotations.Immutable; import org.bson.util.annotations.ThreadSafe; import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; // TODO: // pull config to get // priority // slave delay /** * Keeps replica set status. Maintains a background thread to ping all members of the set to keep the status current. */ @ThreadSafe public class ReplicaSetStatus { static final Logger _rootLogger = Logger.getLogger( "com.mongodb.ReplicaSetStatus" ); ReplicaSetStatus( Mongo mongo, List<ServerAddress> initial ){ _mongoOptions = _mongoOptionsDefaults.copy(); _mongoOptions.socketFactory = mongo._options.socketFactory; _mongo = mongo; _updater = new Updater(initial); } void start() { _updater.start(); } public String getName() { return _setName.get(); } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("{replSetName: ").append(_setName.get()); sb.append(", nextResolveTime: ").append(new Date(_updater.getNextResolveTime()).toString()); sb.append(", members: ").append(_replicaSetHolder); sb.append(", updaterIntervalMS: ").append(updaterIntervalMS); sb.append(", updaterIntervalNoMasterMS: ").append(updaterIntervalNoMasterMS); sb.append(", slaveAcceptableLatencyMS: ").append(slaveAcceptableLatencyMS); sb.append(", inetAddrCacheMS: ").append(inetAddrCacheMS); sb.append(", latencySmoothFactor: ").append(latencySmoothFactor); sb.append("}"); return sb.toString(); } void _checkClosed(){ if ( _closed ) throw new IllegalStateException( "ReplicaSetStatus closed" ); } /** * @return master or null if don't have one */ public ServerAddress getMaster(){ Node n = getMasterNode(); if ( n == null ) return null; return n.getServerAddress(); } Node getMasterNode(){ _checkClosed(); return _replicaSetHolder.get().getMaster(); } /** * @param srv * the server to compare * @return indication if the ServerAddress is the current Master/Primary */ public boolean isMaster(ServerAddress srv) { if (srv == null) return false; return srv.equals(getMaster()); } /** * @param tags tags map * @return a good secondary by tag value or null if can't find one */ ServerAddress getASecondary( DBObject tags ) { // store the reference in local, so that it doesn't change out from under us while looping List<Tag> tagList = new ArrayList<Tag>(); for ( String key : tags.keySet() ) { tagList.add(new Tag(key, tags.get(key).toString())); } Node node = _replicaSetHolder.get().getASecondary(tagList); if (node != null) { return node.getServerAddress(); } return null; } /** * @return a good secondary or null if can't find one */ ServerAddress getASecondary() { Node node = _replicaSetHolder.get().getASecondary(); if (node == null) { return null; } return node._addr; } boolean hasServerUp() { for (Node node : _replicaSetHolder.get().getAll()) { if (node.isOk()) { return true; } } return false; } // Simple abstraction over a volatile ReplicaSet reference that starts as null. The get method blocks until members // is not null. The set method notifies all, thus waking up all getters. @ThreadSafe static class ReplicaSetHolder { private volatile ReplicaSet members; // blocks until replica set is set. synchronized ReplicaSet get() { while (members == null) { try { wait(); } catch (InterruptedException e) { throw new MongoException("Interrupted while waiting for next update to replica set status", e); } } return members; } // set the replica set to a non-null value and notifies all threads waiting. synchronized void set(ReplicaSet members) { if (members == null) { throw new IllegalArgumentException("members can not be null"); } this.members = members; notifyAll(); } // blocks until the replica set is set again synchronized void waitForNextUpdate() { try { wait(); } catch (InterruptedException e) { throw new MongoException("Interrupted while waiting for next update to replica set status", e); } } public synchronized void close() { this.members = null; notifyAll(); } public String toString() { ReplicaSet cur = this.members; if (cur != null) { return cur.toString(); } return "none"; } } // Immutable snapshot state of a replica set. Since the nodes don't change state, this class pre-computes the list // of good secondaries so that choosing a random good secondary is dead simple @Immutable static class ReplicaSet { final List<Node> all; final Random random; final List<Node> goodSecondaries; final Map<Tag, List<Node>> goodSecondariesByTagMap; final Node master; public ReplicaSet(List<Node> nodeList, Random random, int acceptableLatencyMS) { this.random = random; this.all = Collections.unmodifiableList(new ArrayList<Node>(nodeList)); this.goodSecondaries = Collections.unmodifiableList(calculateGoodSecondaries(all, calculateBestPingTime(all), acceptableLatencyMS)); Set<Tag> uniqueTags = new HashSet<Tag>(); for (Node curNode : all) { for (Tag curTag : curNode._tags) { uniqueTags.add(curTag); } } Map<Tag, List<Node>> goodSecondariesByTagMap = new HashMap<Tag, List<Node>>(); for (Tag curTag : uniqueTags) { List<Node> taggedMembers = getMembersByTag(all, curTag); goodSecondariesByTagMap.put(curTag, Collections.unmodifiableList(calculateGoodSecondaries(taggedMembers, calculateBestPingTime(taggedMembers), acceptableLatencyMS))); } this.goodSecondariesByTagMap = Collections.unmodifiableMap(goodSecondariesByTagMap); master = findMaster(); } public List<Node> getAll() { return all; } public boolean hasMaster() { return getMaster() != null; } public Node getMaster() { return master; } public int getMaxBsonObjectSize() { if (hasMaster()) { return getMaster().getMaxBsonObjectSize(); } else { return Bytes.MAX_OBJECT_SIZE; } } public Node getASecondary() { if (goodSecondaries.isEmpty()) { return null; } return goodSecondaries.get(random.nextInt(goodSecondaries.size())); } public Node getASecondary(List<Tag> tags) { for (Tag tag : tags) { List<Node> goodSecondariesByTag = goodSecondariesByTagMap.get(tag); if (goodSecondariesByTag != null) { Node node = goodSecondariesByTag.get(random.nextInt(goodSecondariesByTag.size())); if (node != null) { return node; } } } return null; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("[ "); for (Node node : getAll()) sb.append(node.toJSON()).append(","); sb.setLength(sb.length() - 1); //remove last comma sb.append(" ]"); return sb.toString(); } public Node findMaster() { for (Node node : all) { if (node.master()) return node; } return null; } static float calculateBestPingTime(List<Node> members) { float bestPingTime = Float.MAX_VALUE; for (Node cur : members) { if (!cur.secondary()) { continue; } if (cur._pingTime < bestPingTime) { bestPingTime = cur._pingTime; } } return bestPingTime; } static List<Node> calculateGoodSecondaries(List<Node> members, float bestPingTime, int acceptableLatencyMS) { List<Node> goodSecondaries = new ArrayList<Node>(members.size()); for (Node cur : members) { if (!cur.secondary()) { continue; } if (cur._pingTime - acceptableLatencyMS <= bestPingTime ) { goodSecondaries.add(cur); } } return goodSecondaries; } static List<Node> getMembersByTag(List<Node> members, Tag tag) { List<Node> membersByTag = new ArrayList<Node>(); for (Node cur : members) { if (cur._tags.contains(tag)) { membersByTag.add(cur); } } return membersByTag; } } // Represents the state of a node in the replica set. Instances of this class are immutable. @Immutable static class Node { Node(ServerAddress addr, Set<String> names, float pingTime, boolean ok, boolean isMaster, boolean isSecondary, LinkedHashMap<String, String> tags, int maxBsonObjectSize) { this._addr = addr; this._names = Collections.unmodifiableSet(new HashSet<String>(names)); this._pingTime = pingTime; this._ok = ok; this._isMaster = isMaster; this._isSecondary = isSecondary; this._tags = Collections.unmodifiableSet(getTagsFromMap(tags)); this._maxBsonObjectSize = maxBsonObjectSize; } private static Set<Tag> getTagsFromMap(LinkedHashMap<String,String> tagMap) { Set<Tag> tagSet = new HashSet<Tag>(); for (Map.Entry<String, String> curEntry : tagMap.entrySet()) { tagSet.add(new Tag(curEntry.getKey(), curEntry.getValue())); } return tagSet; } public boolean isOk() { return _ok; } public boolean master(){ return _ok && _isMaster; } public int getMaxBsonObjectSize() { return _maxBsonObjectSize; } public boolean secondary(){ return _ok && _isSecondary; } public ServerAddress getServerAddress() { return _addr; } public Set<String> getNames() { return _names; } public Set<Tag> getTags() { return _tags; } public String toJSON(){ StringBuilder buf = new StringBuilder(); buf.append( "{ address:'" ).append( _addr ).append( "', " ); buf.append( "ok:" ).append( _ok ).append( ", " ); buf.append( "ping:" ).append( _pingTime ).append( ", " ); buf.append( "isMaster:" ).append( _isMaster ).append( ", " ); buf.append( "isSecondary:" ).append( _isSecondary ).append( ", " ); buf.append( "maxBsonObjectSize:" ).append( _maxBsonObjectSize ).append( ", " ); if(_tags != null && _tags.size() > 0) buf.append( "tags:" ).append( JSON.serialize(_tags ) ); buf.append("}"); return buf.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Node node = (Node) o; if (_isMaster != node._isMaster) return false; if (_maxBsonObjectSize != node._maxBsonObjectSize) return false; if (_isSecondary != node._isSecondary) return false; if (_ok != node._ok) return false; if (Float.compare(node._pingTime, _pingTime) != 0) return false; if (!_addr.equals(node._addr)) return false; if (!_names.equals(node._names)) return false; if (!_tags.equals(node._tags)) return false; return true; } @Override public int hashCode() { int result = _addr.hashCode(); result = 31 * result + (_pingTime != +0.0f ? Float.floatToIntBits(_pingTime) : 0); result = 31 * result + _names.hashCode(); result = 31 * result + _tags.hashCode(); result = 31 * result + (_ok ? 1 : 0); result = 31 * result + (_isMaster ? 1 : 0); result = 31 * result + (_isSecondary ? 1 : 0); result = 31 * result + _maxBsonObjectSize; return result; } private final ServerAddress _addr; private final float _pingTime; private final Set<String> _names; private final Set<Tag> _tags; private final boolean _ok; private final boolean _isMaster; private final boolean _isSecondary; private final int _maxBsonObjectSize; } // Simple class to hold a single tag, both key and value @Immutable static final class Tag { final String key; final String value; Tag(String key, String value) { this.key = key; this.value = value; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Tag tag = (Tag) o; if (key != null ? !key.equals(tag.key) : tag.key != null) return false; if (value != null ? !value.equals(tag.value) : tag.value != null) return false; return true; } @Override public int hashCode() { int result = key != null ? key.hashCode() : 0; result = 31 * result + (value != null ? value.hashCode() : 0); return result; } } // Represents the state of a node in the replica set. Instances of this class are mutable. static class UpdatableNode { UpdatableNode(ServerAddress addr, List<UpdatableNode> all, AtomicReference<Logger> logger, Mongo mongo, MongoOptions mongoOptions, AtomicReference<String> setName, AtomicReference<String> lastPrimarySignal) { _addr = addr; _all = all; _mongoOptions = mongoOptions; _port = new DBPort( addr , null , _mongoOptions ); _names.add( addr.toString() ); _logger = logger; _mongo = mongo; _setName = setName; _lastPrimarySignal = lastPrimarySignal; } private void updateAddr() { try { if (_addr.updateInetAddr()) { // address changed, need to use new ports _port = new DBPort(_addr, null, _mongoOptions); _mongo.getConnector().updatePortPool(_addr); _logger.get().log(Level.INFO, "Address of host " + _addr.toString() + " changed to " + _addr.getSocketAddress().toString()); } } catch (UnknownHostException ex) { _logger.get().log(Level.WARNING, null, ex); } } synchronized void update(Set<UpdatableNode> seenNodes){ try { long start = System.nanoTime(); CommandResult res = _port.runCommand( _mongo.getDB("admin") , _isMasterCmd ); long end = System.nanoTime(); float newPingMS = (end - start) / 1000000F; if (!successfullyContacted) _pingTimeMS = newPingMS; else _pingTimeMS = _pingTimeMS + ((newPingMS - _pingTimeMS) / latencySmoothFactor); _rootLogger.log( Level.FINE , "Latency to " + _addr + " actual=" + newPingMS + " smoothed=" + _pingTimeMS); successfullyContacted = true; if ( res == null ){ throw new MongoInternalException("Invalid null value returned from isMaster"); } if (!_ok) { _logger.get().log( Level.INFO , "Server seen up: " + _addr ); } _ok = true; _isMaster = res.getBoolean( "ismaster" , false ); _isSecondary = res.getBoolean( "secondary" , false ); _lastPrimarySignal.set( res.getString( "primary" ) ); if ( res.containsField( "hosts" ) ){ for ( Object x : (List)res.get("hosts") ){ String host = x.toString(); UpdatableNode node = _addIfNotHere(host); if (node != null && seenNodes != null) seenNodes.add(node); } } if ( res.containsField( "passives" ) ){ for ( Object x : (List)res.get("passives") ){ String host = x.toString(); UpdatableNode node = _addIfNotHere(host); if (node != null && seenNodes != null) seenNodes.add(node); } } // Tags were added in 2.0 but may not be present if (res.containsField( "tags" )) { DBObject tags = (DBObject) res.get( "tags" ); for ( String key : tags.keySet() ) { _tags.put( key, tags.get( key ).toString() ); } } // max size was added in 1.8 if (res.containsField("maxBsonObjectSize")) { _maxBsonObjectSize = (Integer) res.get("maxBsonObjectSize"); } else { _maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE; } if (res.containsField("setName")) { String setName = res.get( "setName" ).toString(); if ( _setName.get() == null ){ _setName.set(setName); _logger.set( Logger.getLogger( _rootLogger.getName() + "." + setName)); } else if ( !_setName.get().equals( setName ) ){ _logger.get().log( Level.SEVERE , "mismatch set name old: " + _setName.get() + " new: " + setName ); } } } catch ( Exception e ){ if (_ok) { _logger.get().log( Level.WARNING , "Server seen down: " + _addr, e ); } else if (Math.random() < 0.1) { _logger.get().log( Level.WARNING , "Server seen down: " + _addr, e ); } _ok = false; } } UpdatableNode _addIfNotHere( String host ){ UpdatableNode n = findNode( host, _all, _logger ); if ( n == null ){ try { n = new UpdatableNode( new ServerAddress( host ), _all, _logger, _mongo, _mongoOptions, _setName, _lastPrimarySignal ); _all.add( n ); } catch ( UnknownHostException un ){ _logger.get().log( Level.WARNING , "couldn't resolve host [" + host + "]" ); } } return n; } private UpdatableNode findNode( String host, List<UpdatableNode> members, AtomicReference<Logger> logger ){ for (UpdatableNode node : members) if (node._names.contains(host)) return node; ServerAddress addr; try { addr = new ServerAddress( host ); } catch ( UnknownHostException un ){ logger.get().log( Level.WARNING , "couldn't resolve host [" + host + "]" ); return null; } for (UpdatableNode node : members) { if (node._addr.equals(addr)) { node._names.add(host); return node; } } return null; } public void close() { _port.close(); _port = null; } final ServerAddress _addr; private final Set<String> _names = Collections.synchronizedSet( new HashSet<String>() ); private DBPort _port; // we have our own port so we can set different socket options and don't have to owrry about the pool final LinkedHashMap<String, String> _tags = new LinkedHashMap<String, String>( ); boolean successfullyContacted = false; boolean _ok = false; float _pingTimeMS = 0; boolean _isMaster = false; boolean _isSecondary = false; int _maxBsonObjectSize; double _priority = 0; private final AtomicReference<Logger> _logger; private final MongoOptions _mongoOptions; private final Mongo _mongo; private final AtomicReference<String> _setName; private final AtomicReference<String> _lastPrimarySignal; private final List<UpdatableNode> _all; } // Thread that monitors the state of the replica set. This thread is responsible for setting a new ReplicaSet // instance on ReplicaSetStatus.members every pass through the members of the set. class Updater extends Thread { Updater(List<ServerAddress> initial){ super( "ReplicaSetStatus:Updater" ); setDaemon( true ); _all = new ArrayList<UpdatableNode>(initial.size()); for ( ServerAddress addr : initial ){ _all.add( new UpdatableNode( addr, _all, _logger, _mongo, _mongoOptions, _setName, _lastPrimarySignal ) ); } _nextResolveTime = System.currentTimeMillis() + inetAddrCacheMS; } @Override public void run() { try { while (!Thread.interrupted()) { int curUpdateIntervalMS = updaterIntervalNoMasterMS; try { updateAll(); updateInetAddresses(); ReplicaSet replicaSet = new ReplicaSet(createNodeList(), _random, slaveAcceptableLatencyMS); _replicaSetHolder.set(replicaSet); if (replicaSet.hasMaster()) { _mongo.getConnector().setMaster(replicaSet.getMaster()); curUpdateIntervalMS = updaterIntervalMS; } } catch (Exception e) { _logger.get().log(Level.WARNING, "couldn't do update pass", e); } Thread.sleep(curUpdateIntervalMS); } } catch (InterruptedException e) { // Allow thread to exit } _replicaSetHolder.close(); closeAllNodes(); } public long getNextResolveTime() { return _nextResolveTime; } public synchronized void updateAll(){ HashSet<UpdatableNode> seenNodes = new HashSet<UpdatableNode>(); // make a copy of _all, since UpdatableNode.update can add to it for (UpdatableNode node : new ArrayList<UpdatableNode>(_all)) { node.update(seenNodes); } if (seenNodes.size() > 0) { // not empty, means that at least 1 server gave node list // remove unused hosts Iterator<UpdatableNode> it = _all.iterator(); while (it.hasNext()) { if (!seenNodes.contains(it.next())) it.remove(); } } } private List<Node> createNodeList() { List<Node> nodeList = new ArrayList<Node>(_all.size()); for (UpdatableNode cur : _all) { nodeList.add(new Node(cur._addr, cur._names, cur._pingTimeMS, cur._ok, cur._isMaster, cur._isSecondary, cur._tags, cur._maxBsonObjectSize)); } return nodeList; } private void updateInetAddresses() { long now = System.currentTimeMillis(); if (inetAddrCacheMS > 0 && _nextResolveTime < now) { _nextResolveTime = now + inetAddrCacheMS; for (UpdatableNode node : _all) { node.updateAddr(); } } } private void closeAllNodes() { for (UpdatableNode node : _all) { try { node.close(); } catch (final Throwable t) { /* nada */ } } } private final List<UpdatableNode> _all; private volatile long _nextResolveTime; private final Random _random = new Random(); } /** * Ensures that we have the current master, if there is one. If the current snapshot of the replica set * has no master, this method waits one cycle to find a new master, and returns it if found, or null if not. * * @return address of the current master, or null if there is none */ Node ensureMaster() { if (_closed) { return null; } Node masterNode = getMasterNode(); if (masterNode != null) { return masterNode; } _replicaSetHolder.waitForNextUpdate(); masterNode = getMasterNode(); if (masterNode != null) { return masterNode; } return null; } List<ServerAddress> getServerAddressList() { List<ServerAddress> addrs = new ArrayList<ServerAddress>(); for (Node node : _replicaSetHolder.get().getAll()) addrs.add(node.getServerAddress()); return addrs; } void close() { _closed = true; _updater.interrupt(); } /** * Gets the maximum size for a BSON object supported by the current master server. * Note that this value may change over time depending on which server is master. * @return the maximum size, or 0 if not obtained from servers yet. */ public int getMaxBsonObjectSize() { return _replicaSetHolder.get().getMaxBsonObjectSize(); } final ReplicaSetHolder _replicaSetHolder = new ReplicaSetHolder(); final Updater _updater; private final Mongo _mongo; private final AtomicReference<String> _setName = new AtomicReference<String>(); // null until init // will get changed to use set name once its found private final AtomicReference<Logger> _logger = new AtomicReference<Logger>(_rootLogger); private final AtomicReference<String> _lastPrimarySignal = new AtomicReference<String>(); private volatile boolean _closed; final static int updaterIntervalMS; final static int updaterIntervalNoMasterMS; final static int slaveAcceptableLatencyMS; final static int inetAddrCacheMS; final static float latencySmoothFactor; private final MongoOptions _mongoOptions; private static final MongoOptions _mongoOptionsDefaults = new MongoOptions(); static { updaterIntervalMS = Integer.parseInt(System.getProperty("com.mongodb.updaterIntervalMS", "5000")); updaterIntervalNoMasterMS = Integer.parseInt(System.getProperty("com.mongodb.updaterIntervalNoMasterMS", "10")); slaveAcceptableLatencyMS = Integer.parseInt(System.getProperty("com.mongodb.slaveAcceptableLatencyMS", "15")); inetAddrCacheMS = Integer.parseInt(System.getProperty("com.mongodb.inetAddrCacheMS", "300000")); latencySmoothFactor = Float.parseFloat(System.getProperty("com.mongodb.latencySmoothFactor", "4")); _mongoOptionsDefaults.connectTimeout = Integer.parseInt(System.getProperty("com.mongodb.updaterConnectTimeoutMS", "20000")); _mongoOptionsDefaults.socketTimeout = Integer.parseInt(System.getProperty("com.mongodb.updaterSocketTimeoutMS", "20000")); } static final DBObject _isMasterCmd = new BasicDBObject( "ismaster" , 1 ); }