/**
* Prepares {@link ReadPreference} from given {@link ReadPreferenceEnum}
*
* @param readPreferenceEnum Read preference enum value provided in config
* @return Read preference for mongo client options
*/
public static ReadPreference readPreference(ReadPreferenceEnum readPreferenceEnum) {
switch (readPreferenceEnum) {
case NEAREST:
return ReadPreference.nearest();
case PRIMARY:
return ReadPreference.primary();
case SECONDARY:
return ReadPreference.secondary();
case PRIMARY_PREFERRED:
return ReadPreference.primaryPreferred();
case SECONDARY_PREFERRED:
return ReadPreference.secondaryPreferred();
default:
return null;
}
}
java类com.mongodb.ReadPreference的实例源码
Utils.java 文件源码
项目:adeptj-modules
阅读 40
收藏 0
点赞 0
评论 0
MongoConfiguration.java 文件源码
项目:mongolastic
阅读 31
收藏 0
点赞 0
评论 0
private void prepareClient() {
try {
ServerAddress address = new ServerAddress(config.getMongo().getHost(), config.getMongo().getPort());
MongoClientOptions options = MongoClientOptions.builder()
.serverSelectionTimeout(5000)
.socketKeepAlive(false)
.readPreference(ReadPreference.primaryPreferred())
.sslInvalidHostNameAllowed(true)
.build();
client = connectToClient(address, options);
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
System.exit(-1);
}
}
MongoProperties.java 文件源码
项目:edison-microservice
阅读 26
收藏 0
点赞 0
评论 0
public MongoClientOptions toMongoClientOptions(final CodecRegistry codecRegistry) {
return builder()
.sslEnabled(sslEnabled)
.codecRegistry(codecRegistry)
.readPreference(ReadPreference.valueOf(readPreference))
.connectTimeout(connectTimeout)
.serverSelectionTimeout(serverSelectionTimeout)
.cursorFinalizerEnabled(true)
.maxWaitTime(maxWaitTime)
.maxConnectionLifeTime(connectionpool.getMaxLifeTime())
.threadsAllowedToBlockForConnectionMultiplier(connectionpool.getBlockedConnectionMultiplier())
.maxConnectionIdleTime(connectionpool.getMaxIdleTime())
.minConnectionsPerHost(connectionpool.getMinSize())
.connectionsPerHost(connectionpool.getMaxSize())
.build();
}
MongoDBConfig.java 文件源码
项目:datacollector
阅读 23
收藏 0
点赞 0
评论 0
public void init(
Stage.Context context,
List<Stage.ConfigIssue> issues,
ReadPreference readPreference,
WriteConcern writeConcern
) {
mongoClient = createClient(context, issues, readPreference, writeConcern);
if (!issues.isEmpty()) {
return;
}
mongoDatabase = createMongoDatabase(context, issues, readPreference, writeConcern);
if (!issues.isEmpty()) {
return;
}
mongoCollection = createMongoCollection(context, issues, readPreference, writeConcern);
}
MongoDBConfig.java 文件源码
项目:datacollector
阅读 41
收藏 0
点赞 0
评论 0
private MongoDatabase createMongoDatabase(
Stage.Context context,
List<Stage.ConfigIssue> issues,
ReadPreference readPreference,
WriteConcern writeConcern
) {
MongoDatabase mongoDatabase = null;
try {
if (readPreference != null) {
mongoDatabase = mongoClient.getDatabase(database).withReadPreference(readPreference);
} else if (writeConcern != null) {
mongoDatabase = mongoClient.getDatabase(database).withWriteConcern(writeConcern);
}
} catch (MongoClientException e) {
issues.add(context.createConfigIssue(
Groups.MONGODB.name(),
MONGO_CONFIG_PREFIX + "database",
Errors.MONGODB_02,
database,
e.toString()
));
}
return mongoDatabase;
}
MongoDBConfig.java 文件源码
项目:datacollector
阅读 29
收藏 0
点赞 0
评论 0
private MongoCollection createMongoCollection(
Stage.Context context,
List<Stage.ConfigIssue> issues,
ReadPreference readPreference,
WriteConcern writeConcern
) {
MongoCollection mongoCollection = null;
try {
if (readPreference != null) {
mongoCollection = mongoDatabase.getCollection(collection).withReadPreference(readPreference);
} else if (writeConcern != null) {
mongoCollection = mongoDatabase.getCollection(collection).withWriteConcern(writeConcern);
}
} catch (MongoClientException e) {
issues.add(context.createConfigIssue(
Groups.MONGODB.name(),
MONGO_CONFIG_PREFIX + "collection",
Errors.MONGODB_03,
collection,
e.toString()
));
}
return mongoCollection;
}
MongoUtils.java 文件源码
项目:mandrel
阅读 29
收藏 0
点赞 0
评论 0
public static void checkCapped(MongoDatabase database, String collectionName, int size, int maxDocuments, boolean delete) {
if (Lists.newArrayList(database.listCollectionNames()).contains(collectionName)) {
log.debug("'{}' collection already exists...", collectionName);
// Check if already capped
Document command = new Document("collStats", collectionName);
boolean isCapped = database.runCommand(command, ReadPreference.primary()).getBoolean("capped").booleanValue();
if (!isCapped) {
if (delete) {
database.getCollection(collectionName).drop();
database.createCollection(collectionName, new CreateCollectionOptions().capped(true).maxDocuments(maxDocuments).sizeInBytes(size));
} else {
log.info("'{}' is not capped, converting it...", collectionName);
command = new Document("convertToCapped", collectionName).append("size", size).append("max", maxDocuments);
database.runCommand(command, ReadPreference.primary());
}
} else {
log.debug("'{}' collection already capped!", collectionName);
}
} else {
database.createCollection(collectionName, new CreateCollectionOptions().capped(true).maxDocuments(maxDocuments).sizeInBytes(size));
}
}
MongoBlobStore.java 文件源码
项目:jackrabbit-dynamodb-store
阅读 29
收藏 0
点赞 0
评论 0
private MongoBlob getBlob(String id, long lastMod) {
DBObject query = getBlobQuery(id, lastMod);
// try the secondary first
// TODO add a configuration option for whether to try reading from secondary
ReadPreference pref = ReadPreference.secondaryPreferred();
DBObject fields = new BasicDBObject();
fields.put(MongoBlob.KEY_DATA, 1);
MongoBlob blob = (MongoBlob) getBlobCollection().findOne(query, fields, pref);
if (blob == null) {
// not found in the secondary: try the primary
pref = ReadPreference.primary();
blob = (MongoBlob) getBlobCollection().findOne(query, fields, pref);
}
return blob;
}
MongoMissingLastRevSeeker.java 文件源码
项目:jackrabbit-dynamodb-store
阅读 25
收藏 0
点赞 0
评论 0
@Override
public CloseableIterable<NodeDocument> getCandidates(final long startTime) {
DBObject query =
start(NodeDocument.MODIFIED_IN_SECS).greaterThanEquals(
NodeDocument.getModifiedInSecs(startTime))
.get();
DBObject sortFields = new BasicDBObject(NodeDocument.MODIFIED_IN_SECS, -1);
DBCursor cursor =
getNodeCollection().find(query)
.sort(sortFields)
.setReadPreference(ReadPreference.primary());
return CloseableIterable.wrap(transform(cursor, new Function<DBObject, NodeDocument>() {
@Override
public NodeDocument apply(DBObject input) {
return store.convertFromDBObject(Collection.NODES, input);
}
}), cursor);
}
MemoryDocumentStore.java 文件源码
项目:jackrabbit-dynamodb-store
阅读 31
收藏 0
点赞 0
评论 0
@Override
public void setReadWriteMode(String readWriteMode) {
if (readWriteMode == null || readWriteMode.equals(lastReadWriteMode)) {
return;
}
lastReadWriteMode = readWriteMode;
try {
Map<String, String> map = Splitter.on(", ").withKeyValueSeparator(":").split(readWriteMode);
String read = map.get("read");
if (read != null) {
ReadPreference readPref = ReadPreference.valueOf(read);
if (!readPref.equals(this.readPreference)) {
this.readPreference = readPref;
}
}
String write = map.get("write");
if (write != null) {
WriteConcern writeConcern = WriteConcern.valueOf(write);
if (!writeConcern.equals(this.writeConcern)) {
this.writeConcern = writeConcern;
}
}
} catch (Exception e) {
// unsupported or parse error - ignore
}
}
ReadPreferenceIT.java 文件源码
项目:jackrabbit-dynamodb-store
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testMongoReadPreferencesDefault() throws Exception{
assertEquals(ReadPreference.primary(),
mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PRIMARY));
assertEquals(ReadPreference.primaryPreferred(),
mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_PRIMARY));
//By default Mongo read preference is primary
assertEquals(ReadPreference.primary(),
mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
//Change the default and assert again
mongoDS.getDBCollection(NODES).getDB().setReadPreference(ReadPreference.secondary());
assertEquals(ReadPreference.secondary(),
mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY));
//for case where parent age cannot be determined the preference should be primaryPreferred
assertEquals(ReadPreference.primaryPreferred(),
mongoDS.getMongoReadPreference(NODES,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
//For collection other than NODES always primary
assertEquals(ReadPreference.primary(),
mongoDS.getMongoReadPreference(SETTINGS,"foo", DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
}
ReadPreferenceIT.java 文件源码
项目:jackrabbit-dynamodb-store
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void testMongoReadPreferencesWithAge() throws Exception{
//Change the default
ReadPreference testPref = ReadPreference.secondary();
mongoDS.getDBCollection(NODES).getDB().setReadPreference(testPref);
NodeBuilder b1 = nodeStore.getRoot().builder();
b1.child("x").child("y");
nodeStore.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
String id = Utils.getIdFromPath("/x/y");
String parentId = Utils.getParentId(id);
mongoDS.invalidateCache(NODES,id);
//For modifiedTime < replicationLag primary should be preferred
assertEquals(ReadPreference.primaryPreferred(),
mongoDS.getMongoReadPreference(NODES,parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
//Going into future to make parent /x old enough
clock.waitUntil(Revision.getCurrentTimestamp() + replicationLag);
mongoDS.setClock(clock);
//For old modified nodes secondaries should be preferred
assertEquals(testPref,
mongoDS.getMongoReadPreference(NODES, parentId, DocumentReadPreference.PREFER_SECONDARY_IF_OLD_ENOUGH));
}
MongoLocking.java 文件源码
项目:lightblue-mongo
阅读 29
收藏 0
点赞 0
评论 0
public int getLockCount(String callerId, String resourceId) {
Date now = new Date();
BasicDBObject q = new BasicDBObject().
append(CALLERID, callerId).
append(RESOURCEID, resourceId).
append(EXPIRATION, new BasicDBObject("$gt", now)).
append(COUNT, new BasicDBObject("$gt", 0));
BasicDBObject field = new BasicDBObject(COUNT, 1);
DBObject lock = coll.findOne(q, field,ReadPreference.primary());
if (lock != null) {
int cnt = ((Number) lock.get(COUNT)).intValue();
LOGGER.debug("{}/{} lockCount={}", callerId, resourceId, cnt);
return cnt;
} else {
throw new InvalidLockException(resourceId);
}
}
MongoLocking.java 文件源码
项目:lightblue-mongo
阅读 29
收藏 0
点赞 0
评论 0
public void ping(String callerId, String resourceId) {
Date now = new Date();
BasicDBObject q = new BasicDBObject().
append(CALLERID, callerId).
append(RESOURCEID, resourceId).
append(EXPIRATION, new BasicDBObject("$gt", now)).
append(COUNT, new BasicDBObject("$gt", 0));
DBObject lock = coll.findOne(q,null,ReadPreference.primary());
if (lock != null) {
Date expiration = new Date(now.getTime() + ((Number) lock.get(TTL)).longValue());
int ver = ((Number) lock.get(VERSION)).intValue();
BasicDBObject update = new BasicDBObject().
append("$set", new BasicDBObject(TIMESTAMP, now).
append(EXPIRATION, expiration)).
append("$inc", new BasicDBObject(VERSION, 1));
q = q.append(VERSION, ver);
WriteResult wr = coll.update(q, update, false, false, WriteConcern.ACKNOWLEDGED);
if (wr.getN() != 1) {
throw new InvalidLockException(resourceId);
}
LOGGER.debug("{}/{} pinged", callerId, resourceId);
} else {
throw new InvalidLockException(resourceId);
}
}
BatchUpdate.java 文件源码
项目:lightblue-mongo
阅读 29
收藏 0
点赞 0
评论 0
/**
* Returns the set of document ids that were not updated with docver
*
* @param docver The current document version
* @param documentIds The document ids to scan
*
* @return The set of document ids that were not updated with docver
*/
public static Set<Object> getFailedUpdates(DBCollection collection,
ObjectId docver,
List<Object> documentIds) {
Set<Object> failedIds=new HashSet<>();
if(!documentIds.isEmpty()) {
// documents with the given _ids and whose docver contains our docVer are the ones we managed to update
// others are failures
BasicDBObject query=new BasicDBObject(DOCVER_FLD,new BasicDBObject("$ne",docver));
query.append("_id",new BasicDBObject("$in",documentIds));
try (DBCursor cursor = collection.find(query,new BasicDBObject("_id",1))
.setReadPreference(ReadPreference.primary())) {
while(cursor.hasNext()) {
failedIds.add(cursor.next().get("_id"));
}
}
}
return failedIds;
}
MongoConfigurationParseTest.java 文件源码
项目:lightblue-mongo
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void readPreference() throws IOException {
try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("parse-test-datasources.json")) {
JsonNode node = JsonUtils.json(is);
MongoConfiguration metadataConfig = new MongoConfiguration();
metadataConfig.initializeFromJson(node.get("metadata_readPreference"));
MongoConfiguration dataConfig = new MongoConfiguration();
dataConfig.initializeFromJson(node.get("mongodata_readPreference"));
assertEquals(ReadPreference.nearest(), metadataConfig.getMongoClientOptions().getReadPreference());
assertEquals(ReadPreference.secondary(), dataConfig.getMongoClientOptions().getReadPreference());
assertEquals(WriteConcern.SAFE, metadataConfig.getWriteConcern());
}
}
MongoClientFactoryTest.java 文件源码
项目:dropwizard-mongodb
阅读 27
收藏 0
点赞 0
评论 0
public void correctlyExtractsMongoClientFromConfiguration() throws Exception {
final Example example = factory.build(testFile);
final MongoClient client = example.getMongoClient().build(environment);
assertThat(client.getAddress().getHost()).isIn("localhost", "127.0.0.1");
assertThat(client.getAddress().getPort()).isEqualTo(ServerAddress.defaultPort());
assertThat(client.getCredentialsList()).isEmpty();
final MongoClientOptions options = client.getMongoClientOptions();
assertThat(options.getDbDecoderFactory()).isEqualTo(DefaultDBDecoder.FACTORY);
assertThat(options.getDbEncoderFactory()).isEqualTo(DefaultDBEncoder.FACTORY);
assertThat(options.getReadPreference()).isEqualTo(ReadPreference.primary());
assertThat(options.getWriteConcern()).isEqualTo(WriteConcern.ACKNOWLEDGED);
assertThat(options.getSocketFactory()).isEqualTo(SocketFactory.getDefault());
}
MongoDBDriver.java 文件源码
项目:birt
阅读 29
收藏 0
点赞 0
评论 0
public static ReadPreferenceChoice getReadPreferenceChoice( ReadPreference readPref )
{
if( readPref == null )
return PRIMARY; // default
String readPrefName = readPref.getName();
if( readPrefName == ReadPreference.primary().getName() )
return PRIMARY;
if( readPrefName == ReadPreference.primaryPreferred().getName() )
return PRIMARY_PREFERRED;
if( readPrefName == ReadPreference.secondary().getName() )
return SECONDARY;
if( readPrefName == ReadPreference.secondaryPreferred().getName() )
return SECONDARY_PREFERRED;
if( readPrefName == ReadPreference.nearest().getName() )
return NEAREST;
return PRIMARY; // default
}
MongoClientTemplet.java 文件源码
项目:mongodb-orm
阅读 21
收藏 0
点赞 0
评论 0
@Override
public long count(String statement, Object parameter) {
logger.debug("Execute 'count' mongodb command. Statement '" + statement + "'.");
SelectConfig config = (SelectConfig) configuration.getConfig(statement);
if (config == null) {
throw new MongoDaoException(statement, "Count statement id '" + statement + "' not found.");
}
String collection = config.getCollection();
NodeEntry query = config.getQuery();
DB db = factory.getDataSource().getDB();
DBCollection coll = db.getCollection(collection);
Map<String, Object> q = (Map<String, Object>) query.executorNode(configuration, parameter);
DBObject queryDbo = new BasicDBObject(q);
logger.debug("Execute 'count' mongodb command. Query '" + queryDbo + "'.");
return coll.count(queryDbo, ReadPreference.secondaryPreferred());
}
MngOpLogReader.java 文件源码
项目:mongodb-rdbms-sync
阅读 30
收藏 0
点赞 0
评论 0
private FindIterable<Document> getCursor(){
MongoClient client = DBCacheManager.INSTANCE.getCachedMongoPool(mongoDbName, mongoUserName);
//MongoClient client = DBCacheManager.INSTANCE.getCachedMongoPool(mongoDbName, "ccwOplRO");
client.setReadPreference(ReadPreference.secondary());
MongoCollection<Document> collection =client.getDatabase(localDb).getCollection(oplogRs);
FindIterable<Document> it = collection.find(Filters.and(Filters.eq(NS, ns),Filters.gt(TS, lastReadTime)))
.cursorType(CursorType.TailableAwait).noCursorTimeout(true).maxAwaitTime(30, TimeUnit.MINUTES);
return it;
}
EclipseLinkConfiguration.java 文件源码
项目:jpa-unit
阅读 43
收藏 0
点赞 0
评论 0
private void configureClientOptions(final Map<String, Object> properties) {
final MongoClientOptions.Builder builder = MongoClientOptions.builder();
final String writeConcern = (String) properties.get(ECLIPSELINK_NOSQL_PROPERTY_MONGO_WRITE_CONCERN);
final String readPreference = (String) properties.get(ECLIPSELINK_NOSQL_PROPERTY_MONGO_READ_PREFERENCE);
if (writeConcern != null) {
builder.writeConcern(WriteConcern.valueOf(writeConcern));
}
if (readPreference != null) {
builder.readPreference(ReadPreference.valueOf(readPreference));
}
mongoClientOptions = builder.build();
}
HibernateOgmConfiguration.java 文件源码
项目:jpa-unit
阅读 25
收藏 0
点赞 0
评论 0
private void configureClientOptions(final Map<String, Object> properties) {
final MongoClientOptions.Builder builder = MongoClientOptions.builder();
setOptions(builder, (final String key) -> (String) properties.get(HIBERNATE_OGM_MONGODB_OPTIONS_PREFIX + "." + key));
final String writeConcern = (String) properties.get(HIBERNATE_OGM_MONGODB_WRITE_CONCERN);
final String readPreference = (String) properties.get(HIBERNATE_OGM_MONGODB_READ_PREFERENCE);
if (writeConcern != null) {
builder.writeConcern(WriteConcern.valueOf(writeConcern));
}
if (readPreference != null) {
builder.readPreference(ReadPreference.valueOf(readPreference));
}
mongoClientOptions = builder.build();
}
EclipseLinkConfigurationTest.java 文件源码
项目:jpa-unit
阅读 23
收藏 0
点赞 0
评论 0
@Test
public void testMongoClientOptions() {
// GIVEN
final Map<String, Object> properties = new HashMap<>();
when(descriptor.getProperties()).thenReturn(properties);
properties.put("eclipselink.nosql.property.mongo.db", "foo");
// it looks like only the two options below are supported by EclipseLink
final ReadPreference readPreference = ReadPreference.nearest();
final WriteConcern writeConcern = WriteConcern.JOURNALED;
properties.put("eclipselink.nosql.property.mongo.read-preference", readPreference.getName());
properties.put("eclipselink.nosql.property.mongo.write-concern", "JOURNALED");
final ConfigurationFactory factory = new ConfigurationFactoryImpl();
// WHEN
final Configuration configuration = factory.createConfiguration(descriptor);
// THEN
assertThat(configuration, notNullValue());
final MongoClientOptions clientOptions = configuration.getClientOptions();
assertThat(clientOptions, notNullValue());
assertThat(clientOptions.getReadPreference(), equalTo(readPreference));
assertThat(clientOptions.getWriteConcern(), equalTo(writeConcern));
}
Persistence.java 文件源码
项目:switchman
阅读 21
收藏 0
点赞 0
评论 0
@Bean
public MongoDbFactory mongoDbFactory() throws Exception {
MongoClientURI uri = new MongoClientURI(mongoDbUrl);
mongo = new MongoClient(uri);
mongo.setReadPreference(ReadPreference.primary());
mongo.setWriteConcern(WriteConcern.ACKNOWLEDGED);
return new SimpleMongoDbFactory(mongo, uri.getDatabase());
}
MongoDatabaseImpl.java 文件源码
项目:mongo-java-driver-rx
阅读 37
收藏 0
点赞 0
评论 0
@Override
public <TResult> Observable<TResult> runCommand(final Bson command, final ReadPreference readPreference,
final Class<TResult> clazz) {
return RxObservables.create(Observables.observe(new Block<SingleResultCallback<TResult>>() {
@Override
public void apply(final SingleResultCallback<TResult> callback) {
wrapped.runCommand(command, readPreference, clazz, callback);
}
}), observableAdapter);
}
FindIterable.java 文件源码
项目:mongofx
阅读 28
收藏 0
点赞 0
评论 0
public FindIterable(final MongoNamespace namespace,
final CodecRegistry codecRegistry,
final ReadPreference readPreference, final OperationExecutor executor,
final Bson filter, final FindOptions findOptions) {
this.namespace = notNull("namespace", namespace);
this.codecRegistry = notNull("codecRegistry", codecRegistry);
this.readPreference = notNull("readPreference", readPreference);
this.executor = notNull("executor", executor);
this.filter = notNull("filter", filter);
this.findOptions = notNull("findOptions", findOptions);
}
RefFindOne.java 文件源码
项目:clotho3crud
阅读 25
收藏 0
点赞 0
评论 0
RefFindOne(DBCollection collection, ReadPreference readPreference, ExtendedUnmarshaller unmarshaller, QueryFactory queryFactory, String query, Object... parameters) {
this.unmarshaller = unmarshaller;
this.collection = collection;
this.readPreference = readPreference;
this.queryFactory = queryFactory;
this.query = this.queryFactory.createQuery(query, parameters);
}
MongoDatabaseImpl.java 文件源码
项目:mongo-java-driver-reactivestreams
阅读 36
收藏 0
点赞 0
评论 0
@Override
public <TResult> Publisher<TResult> runCommand(final Bson command, final ReadPreference readPreference,
final Class<TResult> clazz) {
return new ObservableToPublisher<TResult>(observe(new Block<SingleResultCallback<TResult>>() {
@Override
public void apply(final SingleResultCallback<TResult> callback) {
wrapped.runCommand(command, readPreference, clazz, callback);
}
}));
}
MongoDatabaseImpl.java 文件源码
项目:mongo-java-driver-reactivestreams
阅读 37
收藏 0
点赞 0
评论 0
@Override
public <TResult> Publisher<TResult> runCommand(final ClientSession clientSession, final Bson command,
final ReadPreference readPreference, final Class<TResult> clazz) {
return new ObservableToPublisher<TResult>(observe(new Block<SingleResultCallback<TResult>>() {
@Override
public void apply(final SingleResultCallback<TResult> callback) {
wrapped.runCommand(clientSession, command, readPreference, clazz, callback);
}
}));
}
TestOperationExecutor.java 文件源码
项目:mongo-java-driver-reactivestreams
阅读 29
收藏 0
点赞 0
评论 0
@Override
public <T> void execute(final AsyncReadOperation<T> operation, final ReadPreference readPreference, final ClientSession session,
final SingleResultCallback<T> callback) {
readPreferences.add(readPreference);
clientSessions.add(session);
if (queueExecution) {
queuedReadOperations.add(operation);
queuedReadCallbacks.add(callback);
} else {
readOperations.add(operation);
callResult(callback);
}
}