在写正文之前先讲下什么是kafka
kafka
kafka是一个消息系统 再具体点是发布-订阅制的消息系统,它依赖于zookeeper。也可以把他看作一个数据池。
kafka有这么几个概念
- 生产者
顾名思义是指数据的生产者 - 消费者
同理,消费者是指数据的消费者 - 代理
代理是负责维护发布数据的简单系统。每个代理中的每个主题可以具有零个或多个分区。 - 主题
主题属于特定类别的消息流称为主题。 数据存储在主题中。主题被拆分成分区。 对于每个主题,Kafka保存一个分区的数据。 每个这样的分区包含不可变有序序列的消息。 分区被实现为具有相等大小的一组分段文件 -
分区
主题可能有许多分区,因此它可以处理任意数量的数据。
举个例子,把kafka看作一个大型物流中转站,所有卖家的商品(生产者生产的数据)都会输入到这里。那么这个中转站(kafka)就会对物品进行分类处理(主题处理),分为,医药,家电,办公,建材之类的。又因为距离,效率等问题,不便于及时服务好客户,于是便设立代理,紧接着同类物品分发到各个代理,所以就会有分区概念。最后物品到达消费者手里。
中间还会设计到leader和follower,leader以看作快递员中的Boss,它负责给指定分区读取写入,那么剩下的快递员都是follower,当leader挂掉,剩下的follower会重新选取一个新的leader负责之前的工作,这样就不会出现因为服务挂掉,导致数据丢失问题。
image.png
kylin集成kafka
先看下开源kylin的源码目录结构
image.png
config目录下的都是写配置类,包括id,host,port,cluster等目的是告诉程序去哪里取数据,然后把数据存储到哪里。
hadoop 目录下类 如下:
image.png
到这里发现kylin的UI界面提供的一些配置参数,其实都是配置文件的参数,只不过把做了前后端隔离,填写的数据被调用,组成一个job被挂起
如果一个job一致挂起,然后cube会有很多的segment,所以需要合并,合并就用到了MergeOffsetStep类,合并目的也是为了减少存储空间。达到优化cube的目的
package org.apache.kylin.source.kafka.job;
import java.io.IOException;
import java.util.Collections;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentRange.TSRange;
import org.apache.kylin.metadata.model.Segments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
/**
*/
public class MergeOffsetStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(MergeOffsetStep.class);
public MergeOffsetStep() {
super();
}
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
final CubeInstance cubeCopy = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
final String segmentId = CubingExecutableUtil.getSegmentId(this.getParams());
final CubeSegment segCopy = cubeCopy.getSegmentById(segmentId);
Preconditions.checkNotNull(segCopy, "Cube segment '" + segmentId + "' not found.");
Segments<CubeSegment> mergingSegs = cubeCopy.getMergingSegments(segCopy);
Preconditions.checkArgument(mergingSegs.size() > 0, "Merging segment not exist.");
Collections.sort(mergingSegs);
final CubeSegment first = mergingSegs.get(0);
final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
segCopy.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd()));
CubeUpdate update = new CubeUpdate(cubeCopy);
update.setToUpdateSegs(segCopy);
try {
cubeManager.updateCube(update);
return ExecuteResult.createSucceed();
} catch (IOException e) {
logger.error("fail to update cube segment offset", e);
return ExecuteResult.createError(e);
}
}
}
总的来说集成的过程大概是这样的。
1.按照一定的格式生成数据
2.按照一定格式解析读取数据
3.将数据分发到Hadoop上
4.事实表load新增数据
5.建立cube
6.cube 合并更新操作