Spring Reactive Data With Multi Couchbase Buckets

问题

因莫个实体的数据量较大,需要用单独的一个bucket进行存储以避免影响其他服务性能。但因使用了reactive的技术栈,所以无法使用传统的datasource进行多数据源配置,需自行寻找方法。

spring boot version: 2.2.6.RELEASE

关键依赖:org.springframework.boot:spring-boot-starter-data-couchbase-reactive

解决方法

通过 spring-data-couchbase github 看到通过继承AbstractCouchbaseConfiguration.java 对couchbase链接进行配置。找到 AbstractCouchbaseConfiguration.java ,看到同一目录下有 AbstractReactiveCouchbaseConfiguration.java, 通过java doc我们可以确定我们需要继承 AbstractReactiveCouchbaseConfiguration.java 来在 reactive 中进行配置。

1
2
3
4
5
6
7
8
9
/**
*
* Base class for Reactive Spring Data Couchbase configuration java config
*
* @author Subhashni Balakrishnan
*/
@Configuration
public abstract class AbstractReactiveCouchbaseConfiguration
extends AbstractReactiveCouchbaseDataConfiguration implements CouchbaseConfigurer {

继续看它的父类 AbstractReactiveCouchbaseDataConfiguration.java, 可以看到实际上是在父类中提供了实际调用的 RxJavaCouchbaseTemplate 实例,以及将 repo 和 RxJavaCouchbaseTemplate 之间进行绑定的 ReactiveRepositoryOperationsMapping。并且通过override它的 configureReactiveRepositoryOperationsMapping 方法,我们可以自行绑定 repo 或 实体 与 RxJavaCouchbaseTemplate 的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Provides beans to setup reactive repositories in SDC using {@link CouchbaseConfigurer}.
*
* @author Subhashni Balakrishnan
*/
@Configuration
public abstract class AbstractReactiveCouchbaseDataConfiguration extends CouchbaseConfigurationSupport {

protected abstract CouchbaseConfigurer couchbaseConfigurer();

/**
* Creates a {@link RxJavaCouchbaseTemplate}.
*
* This uses {@link #mappingCouchbaseConverter()}, {@link #translationService()} and {@link #getDefaultConsistency()}
* for construction.
*
*
* @throws Exception on Bean construction failure.
*/
@Bean(name = BeanNames.RXJAVA1_COUCHBASE_TEMPLATE)
public RxJavaCouchbaseTemplate reactiveCouchbaseTemplate() throws Exception {
RxJavaCouchbaseTemplate template = new RxJavaCouchbaseTemplate(couchbaseConfigurer().couchbaseClusterInfo(),
couchbaseConfigurer().couchbaseClient(), mappingCouchbaseConverter(), translationService());
template.setDefaultConsistency(getDefaultConsistency());
return template;
}

/**
* Creates the {@link ReactiveRepositoryOperationsMapping} bean which will be used by the framework to choose which
* {@link RxJavaCouchbaseOperations} should back which {@link ReactiveCouchbaseRepository}.
* Override {@link #configureReactiveRepositoryOperationsMapping} in order to customize this.
*
* @throws Exception
*/
@Bean(name = BeanNames.REACTIVE_COUCHBASE_OPERATIONS_MAPPING)
public ReactiveRepositoryOperationsMapping reactiveRepositoryOperationsMapping(RxJavaCouchbaseTemplate couchbaseTemplate) throws Exception {
//create a base mapping that associates all repositories to the default template
ReactiveRepositoryOperationsMapping baseMapping = new ReactiveRepositoryOperationsMapping(couchbaseTemplate);
//let the user tune it
configureReactiveRepositoryOperationsMapping(baseMapping);
return baseMapping;
}


/**
* In order to customize the mapping between repositories/entity types to couchbase templates,
* use the provided mapping's api (eg. in order to have different buckets backing different repositories).
*
* @param mapping the default mapping (will associate all repositories to the default template).
*/
protected void configureReactiveRepositoryOperationsMapping(ReactiveRepositoryOperationsMapping mapping) {
//NO_OP
}
}

综上,我们整体思路就是继承 AbstractReactiveCouchbaseConfiguration.java,并新建RxJavaCouchbaseTemplate实例与新的bucket相关联,再绑定相对应的mapping关系。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Configuration
@Slf4j
public class CouchbaseConfig extends AbstractReactiveCouchbaseConfiguration {

private CouchbaseProperties couchbaseProperties;
private String bucketName;
private String bucketPass;

public CouchbaseConfig(CouchbaseProperties couchbaseProperties,
@Value("${couchbase.bucket.name}") String bucketName,
@Value("${couchbase.bucket.password}") String bucketPass) {
this.couchbaseProperties = couchbaseProperties;
this.bucketName = bucketName;
this.bucketPass = bucketPass;
}

private Bucket binaryBucket() throws Exception {
return couchbaseCluster().openBucket(bucketName,
bucketPass);
}

private RxJavaCouchbaseTemplate binaryTemplate() throws Exception {
RxJavaCouchbaseTemplate template = new RxJavaCouchbaseTemplate(
couchbaseClusterInfo(), binaryBucket(),
mappingCouchbaseConverter(), translationService());
template.setDefaultConsistency(getDefaultConsistency());
return template;
}

@Override
public void configureReactiveRepositoryOperationsMapping(ReactiveRepositoryOperationsMapping baseMapping) {
try {
baseMapping.mapEntity(SomeEntity.class, binaryTemplate());
// 如果是同一entityClass但有多个bucket,可以新建一个Repository,使用
// baseMapping.mapEntity(SomeRepo.class, binaryTemplate()); 来配置另一个bucket
} catch (Exception ex) {
log.error("Error in creating mapping for {} bucket", bucketName, ex);
}
}

@Override
protected List<String> getBootstrapHosts() {
return couchbaseProperties.getBootstrapHosts();
}

@Override
protected String getBucketName() {
return couchbaseProperties.getBucket().getName();
}

@Override
protected String getBucketPassword() {
return couchbaseProperties.getBucket().getPassword();
}
}

参考链接