Google Analytics

суббота, 2 октября 2010 г.

Практическое знакомство с Cassandra: Часть 3 Учебное приложение MyBlog


В этом посте речь пойдет о разработке учебного приложения блога, в качестве хранилища данных для которого будет применяться Cassandra. Для чтения этого материала желательно ознакомиться с предыдущими постами на эту тему "Часть 1 Модель данных и установка" и "Часть 2 Типы сортировщиков колонок, TimeUUIDHelper и генератор кода Thrift".
ПРЕДУПРЕЖДЕНИЕ! Используйте код только в ознакомительных целях. Код не предназначен для использования в реальных приложениях, он содержит недоработки и может содержать ошибки.

Постановка задачи

Разработать консольное приложение MyBlog. Приложение должно поддерживать следующие функции: создавать необходимый keyspace в Cassandra; создавать, удалять и выводить на экран пользователей; создавать и выводить на экран посты пользователей. (Количество поддерживаемых функций приложения выбрано из расчета начальной демонстрации работы с Cassandra)

Пространство ключей (keyspace)

Пространство ключей для нашего приложения называется blog и включает в себя два семейства колонок (column family). Первое обычное(standard) семейство колонок users. Второе супер (super) timelines. Семейство users содержит полные данные о всех пользователях системы (ключ - строковое значение логина пользователя; колонки - полное имя (fullname), интересы (interests) и почтовый адрес (mail)). Семейство timelines содержит данные в хронологическом порядке, в нашем случае это будут посты (ключ - "posts" для постов; супер колонки - TimeUUID время создания постов; колонки для постов - название (name), тело (body), логин автора (login)).

users
neo fullname interests mail
Neo IT neo@gmail.com

timelines
posts TimeUUID TimeUUID
name body login name body login
... ... agent ... ... agent


Источник данных (Data Source)

Для работы с Cassandra нам необходимо иметь подключение к одному из узлов кластера. Код отвечающий за подключение реализован в классе my.blog.DataSource. В первую очередь необходимо создать объект транспорта TFramedTransport, далее на основе транспорта объект протокола TBinaryProtocol и затем клиента Cassandra.Client. Хочу обратить внимание, что начиная с версии Cassandra 0.7 транспорт, по умолчанию, работает во Framed режиме. Для подключения к узлу вызывается метод open у объекта транспорта. Объект клиента реализует основные функции API по работе с данными. Итак, код создания клиента расположен в конструкторе DataSource и выглядит следующим образом:

public DataSource(String host, int port) {
...
    transport = new TFramedTransport(new TSocket(host, port));
    TProtocol proto = new TBinaryProtocol(transport);
    client = new Cassandra.Client(proto);
...

Кроме ответственности за инициализацию подключения и объекта клиента, код DataSource отвечает за создания keyspace в Cassandra, которое будет использовать приложение блога. В конструкторе класса инициализируются два семейства колонок usersColumnFamily и timelinesColumnFamily согласно описанию keyspace. А код из метода createKeyspace инициализирует пространство в кластере Cassandra.

public DataSource(String host, int port) {
    usersColumnFamily = new CfDef(keyspace, "users");
    usersColumnFamily.setColumn_type("Standard");
    usersColumnFamily.setComment(
        "column family with login key and fullname, interests, mail columns"
            );
    usersColumnFamily.setGc_grace_seconds(0);
        
    timelinesColumnFamily = new CfDef(keyspace, "timelines");
    timelinesColumnFamily.setColumn_type("Super");
    timelinesColumnFamily.setComparator_type("TimeUUIDType");
    timelinesColumnFamily.setComment(
        "column family with timeline name key and sorted TimeUUIDType columns"
            );
    timelinesColumnFamily.setGc_grace_seconds(0);
...

Внимание на две строчки. Первая timelinesColumnFamily.setComparator_type("TimeUUIDType"); Она отвечает за установку компаратора TimeUUIDType, который обеспечивает порядок хранения колонок в хронологическом виде (тип название колонок TimeUUID). Вторая timelinesColumnFamily.setGc_grace_seconds(0); Это установка времени ожидания перед сборкой мусора, значение важно для распределенного удаления.

Сущности (Entities)

Два класса сущностей Posts и Users представляют посты и пользователей соответственно. Это классы данных реализованные в соответствии с JavaBeans и выполняющие роль схожую с ролью Entity в JPA. Вот атрибуты обоих классов:

public class Posts {

    private UUID uuid;
    private String name;
    private String body;
    private String login;
...

public class Users {

    private String login;
    private String fullname;
    private String interests;
    private String mail;
...

Объект доступа к данным (Data Access Object)

Класс MyBlogDAO реализует паттерн DAO. С помощью объекта этого класса осуществляется все работа с хранилищем данных. Этот класс содержит примеры API Cassandra, которые и являются центральной темой серии постов. Рассмотрим пример добавления пользователя (метод insertUser). Метод для вставки данных в хранилище реализован в классе клиента и называется insert. Сигнатура метода описывает следующие параметры: ключ, родитель колонки, колонка и уровень согласованности. В качестве ключа, согласно keyspace, выступает логин пользователя. Родитель колонки это объект ссылающийся на семейство колонок(или суперколонку). Колонка - значение колонки (наименование, значение и временная метка). Уровень согласованности это значение отвечающее за необходимое количество узлов кластера при ответе от которых операция считается выполненной. Таким образом при добавлении пользователя мы инициализируем все колонки с данными для заданного логина.

public void insertUser(Users user) throws DataSourceException {
    ColumnParent columnParent = 
        new ColumnParent(dataSource.getUsersColumnFamily().getName());
    byte[] key = null;
    try {
        key = user.getLogin().getBytes("UTF-8");
    } catch (UnsupportedEncodingException e) {
        DataSource.throwException(e);
    }
    Clock clock = new Clock(System.currentTimeMillis());
    
    try {
        dataSource.getClient().insert(
                key, 
                columnParent, 
                new Column(
                    "fullname".getBytes("UTF-8"), 
                    user.getFullname().getBytes("UTF-8"), 
                    clock
                    ), 
                dataSource.getWriteingConsistencyLevel()
                );
            
        dataSource.getClient().insert(
                key, 
                columnParent, 
                new Column(
                    "interests".getBytes("UTF-8"), 
                    user.getInterests().getBytes("UTF-8"), 
                    clock
                    ), 
                    dataSource.getWriteingConsistencyLevel()
                    );
            
        dataSource.getClient().insert(
                key, 
                columnParent, 
                new Column(
                    "mail".getBytes("UTF-8"), 
                    user.getMail().getBytes("UTF-8"), 
                    clock
                    ), 
                    dataSource.getWriteingConsistencyLevel()
                    );
    } catch (Throwable e) {
        DataSource.throwException(e);
    }
}

В коде выше отсутствует обработка ошибки записи. В реальном приложение необходимо повторно инициализировать запись при ошибке, для приведения данных в согласованное состояние. В противном случае возможен вариант, когда сохраниться только часть полей, а остальные данные будут утеряны.
Теперь рассмотрим выборку данных из семейства суперколонок timelines. Реализация функции возвращения постов из хранилища данных находится в методе getTimelinePosts. Чтобы извлечь необходимое количество суперколонок используется метод get_slice, который возвращает список объектов ColumnOrSuperColumn. Для выборки мы задаем диапазон среза SliceRange, в котором указывает количество необходимых нам колонок и устанавливаем признак инвертирования порядка. Т.е. мы выбираем n-ое количество самых новых постов отсортированных в порядке увеличения времени с момента публикации.

public List<Posts> getTimelinePosts(int count) throws DataSourceException {
    List<Posts> posts = new ArrayList<Posts>();

    ColumnParent columnParent = new ColumnParent(dataSource
            .getTimelinesColumnFamily().getName());
    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setReversed(true);
    if (count != 0) {
        sliceRange.setCount(count);
    }
    sliceRange.setStart(new byte[0]);
    sliceRange.setFinish(new byte[0]);
    predicate.setSlice_range(sliceRange);

    KeyRange range = new KeyRange();

    range.setStart_key(new byte[0]);
    range.setEnd_key(new byte[0]);

    try {
        List<ColumnOrSuperColumn> postsSlice = dataSource.getClient()
        .get_slice("posts".getBytes("UTF-8"), columnParent,
            predicate, dataSource.getReadingConsistencyLevel());

        for (ColumnOrSuperColumn currentColumn : postsSlice) {
            SuperColumn sc = currentColumn.getSuper_column();

            Posts post = new Posts(TimeUUIDHelper.getInstance()
                .byteArrayToUUID(sc.getName()), null, null, null);

            for (Column col : sc.getColumns()) {
                String name = new String(col.getName(), "UTF-8");
                String value = new String(col.getValue(), "UTF-8");

                if ("name".equals(name)) {
                    post.setName(value);
                } else if ("body".equals(name)) {
                    post.setBody(value);
                } else if ("login".equals(name)) {
                    post.setLogin(value);
                }
            }

            posts.add(post);
        }

    } catch (Throwable e) {
        DataSource.throwException(e);
    }

    return posts;
}

Удаление данных в Cassandra

Для Cassandra keyspace задается ReplicationFactor, который определяет количество узлов, на которых должны быть реплицированы колонки каждого ключа. Клиент определяет сколько реплик должны быть выполнены при записи. Если уровень согласованности задает количество релик меньше чем общее их количество, то Cassandra ответит, что запись успешна, даже если некоторые узлы в нерабочем состоянии.
Операция удаления не может очистить все следы существования данных немедленно. Если реплика не получит запрос об удалении, то потом когда она станет доступной, она будет вести себя как будто она имеет новые данные на запись и распространит их на другие хосты. Для избежания этого, Cassandra заменят значение удаленных данных на tombsone(надгробие). И распространяет эти данные, также как запись новых значений. Удаляются же они при сборки мусора, вспомним значение gc_grace_seconds.

Приложение MyBlog

Мы добрались до этапа, когда можно запустить приложение (ссылка на исходный код в разделе дополнительных источников). Выполним приложение со следующими параметрами командной строки:

-create -keyspace
-create -users -login neo -fullname "Neo" -interests "IT" -mail "neo@gmail.com"
-create -users -login agent -fullname "Agent Smith" -interests "Matrix" -mail "smith@gmail.com"
-create -posts -name "To Neo" -body "I'll kill you" -login agent
-create -posts -name "To Neo2" -body "I'll kill you" -login agent
-create -posts -name "To Neo3" -body "I'll kill you" -login agent

Эти параметры для инициализации keyspace и загрузки данных. Запуск с параметрами -show -posts 5 выведет на консоль созданные посты:

Posts:
Name: To Neo3
Body: I'll kill you
Time: Mon Sep 20 14:01:32 ALMT 2010
Author: agent

Name: To Neo2
Body: I'll kill you
Time: Mon Sep 20 14:01:25 ALMT 2010
Author: agent

Name: To Neo
Body: I'll kill you
Time: Mon Sep 20 14:01:04 ALMT 2010
Author: agent

Дополнительные источники

4 комментария:

  1. Иван, а можно вас попросить написать про CounterColumn?

    ОтветитьУдалить
  2. К сожалению, в ближайшем будущем я не смогу выделить время на это. Возможно позже.

    ОтветитьУдалить
  3. версии используемых библиотек отсутствуют. на maven собрать хотел. а подбирать тыкая пальцем в небо...((((

    Статья очень интересная, очень помогла. В работе приложение вот не увидел - это жаль.

    ОтветитьУдалить
  4. Версии ПО указаны указаны в первой из статей по Cassandra (Cassandra 0.7.0 beta1, thrift ревизия 959516 из svn репозитория). Статья достаточно старая, так что актуальность API оставляет желать лучшего. И спасибо за отзыв!

    ОтветитьУдалить