Let's write β

プログラミング中にできたことか、思ったこととか

Rails5 + MySQLでArmgとDBコネクタの拡張を組み合わせて位置情報を扱う

背景

サービスで位置情報を色々なところで扱うのですが MySQLで扱うためには当然Geometry型のカラムを用意してActiveRecordとつなぐ必要があり PostGisを使っているケースが多く、MySQL周りのライブラリはあまり活発ではありませんでした。

RGeoのMySQL2コネクタは停滞している

それっぽいライブラリは有るんですが 更新があまりされていなくて、Rails5への対応もずっと止まっていて少し不安です。

なので、対応した別のライブラリを探すか、これを使うなら自分でPRを凄く送る等が必要かなと思います。

救世主Armg

そんなことをモヤモヤ前から考えていて、自作は流石に大変そうだなと思ってたのですが、 Armgというライブラリが見つかって助かりました Railsで位置情報を扱ってる情報の流通量がそもそも少ないので新しいライブラリの情報を見つけづらいというのはありますね。

データベースへのシリアライズ

Armgでは、というか一般的に独自のカラム型をActiveRecordに追加するには Active Record::Type::Valueを継承する必要があります。 Armgでも以下のように:geometry型を新規に定義しています。

class Armg::MysqlGeometry < ActiveModel::Type::Value
  def type
    :geometry
  end

  def deserialize(value)
    if value.is_a?(::String)
      Armg.deserializer.deserialize(value)
    else
      value
    end
  end

  def serialize(value)
    if value.nil?
      nil
    else
      Armg.serializer.serialize(value)
    end
  end
end

このように定義して、

ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter::NATIVE_DATABASE_TYPES[:geometry] = { name: 'geometry' }
ActiveRecord::Type.register(:geometry, Armg::MysqlGeometry, adapter: :mysql2)

のようにして、データ型をActiveRecordに登録してやっています。

Armg.serializer

Active Record::Type::Valueでは、値をActiveRecordの情報から、 DBへのQueryに埋め込む形式に変えるために、serializeというメソッドが呼び出されます。 拡張なしのActiveRecordで対応しているのは

  • String
  • Numeric
  • Date
  • Time
  • Symbol
  • true
  • false
  • nil

くらいだと思います、なので、通常新規の型を作るなら、これらへの変換をserializeの中でやる必要があります。

Armgへの登録タイミング

ArmgではカスタムのSerializerを定義できるので、 SRIDを適切にGPSの地理座標系に指定するために以下のようにSerializer, Deserializer合わせて定義しました。

require 'rgeo'
require 'armg'

class GeometryDeserializer
  def initialize
    factory = ::RGeo::Geographic.spherical_factory(srid: 4326)
    @base = ::Armg::WkbDeserializer.new(factory: factory)
  end

  def deserialize(mysql_geometry)
    @base.deserialize(mysql_geometry)
  end
end
class GeometrySerializer
  def initialize
    factory = ::RGeo::Geographic.spherical_factory(srid: 4326)
    @base = ::Armg::WkbSerializer.new(factory: factory)
    @wkt_parser = ::RGeo::WKRep::WKTParser.new(
      factory, 
      default_srid: 4326)
  end

  def serialize(value)
    if value.is_a?(String)
      value = @wkt_parser.parse(value)
    end
    @base.serialize(value)
  end

このようなカスタムのSerializerを定義してやりました。 これをArmgに

Armg.serializer = GeometrySerializer.new

登録してやる必要があるのですが、このタイミングが重要です。 initializersのなかで適当に上記の行を実行しようとするとArmg周りの定義で死にます。 実はArmgは

ActiveSupport.on_load(:active_record) do
....
end

というブロックの中でrequire等が行われているので、initializerの段階ではまだArmgの型等がUndefinedだったりします。 そんな状況でnewしようとするので当然死にます。

なので、以下のように

require 'active_support/lazy_load_hooks'
require 'rgeo'
require 'armg'

ActiveSupport.on_load(:active_record) do
    Armg.serializer = ::GeometrySerializer.new
    Armg.deserializer = ::GeometryDeserializer.new
    ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter.prepend(::ArmgConnectionAdapterExt)
end

こちらの登録についても同様にハンドラに登録してやって遅延させる必要があります。

ログが死ぬ

ここまでで本来は問題ないのですが、ここで問題が起こります。 RailsSQLを実行するとそのログを表示しようとするのですが、この表示時に 上記のserializerではWKBと呼ばれるバイナリ形式をつかってSQLにgeometryデータを流し込もうとします。 これがRails上でログ解析時に異常なUTF-8文字列だと認識されて例外を吐くようになります。

SQLにも問題はなく、Serialize、Deserialize普通にできるのですが、 FactoryGirl等で書き込むときに例外を吐かれるとテストが通りません。

AdapterExtを利用してSQLへのエンコーディングの方法を変える

ST_GeomFromWKBを使いたい

WKBで送ろうとすると\x01....的な文字列(に見える形)で送られ、こいつがrubyに引っかかるので このような文字列形式じゃない形で送る必要があります。そこで利用するのが

ST_GeomFromWKB(0x01......., <SRID>)

という16進数とSRIDを指定する形式です。これならエラーは出ないでしょう。

そこで、正常にRubyが処理できる形式でSQLに織り込んでやる必要があります。 前述のように新規のデータ型を追加するには、Active Record::Type::Valueを継承してやって serializeの中で適宜変換してやる必要があるのですが、SQLへの埋め込みは実際にはそれではうまく行きません。

そもそも前述のようにserializeから本来返せるデータ型には制限があります。 今回使うのに適切そうなのはStringだと思いました。 ST_GeomFromWKBのSQL関数文字列を素直にserializeから返してやればいいと。

しかし、そもそもserializeは文字列型で渡されるとSQLには「その文字列をそのまま文字列としてSQLに渡そう」とします。 (当然の挙動ですが)なので、単にST...という文字列を返してしまうとSQLには

UPDATE INTO .... VALUES("ST_GeomFromWKB(....)")

というふうに文字列形式で吐き出されてしまってSQLの関数呼び出しではなくなってしまいます。

AdapterExtでquoteに手を入れる

そのため、Type::Valueのレベルではハンドリングできないことがわかりました。 では、ユーザーはSQLは理解してくれるはずの独自形式を作れないのでしょうか?

そのようなときに使うのがDBへのコネクタへの拡張です。

上のコードで

ActiveRecord::ConnectionAdapters::AbstractMysqlAdapter.prepend(::ArmgConnectionAdapterExt)

の用にしているのがソレです。

これは、DBへのアダプタのチェインにクラスを追加するもので、ベースとなる DBへのコネクタが値をSQLに変換するまでの過程に手を加えることができます。

そのなかでも、quote(value_, column_=nil)というメソッドが重要になってきます。 このメソッドは ある値をSQL そのまま埋め込んで良い文字列に変換するためのメソッドです。 イメージとしては返された文字列が直接SQLに結合されるというイメージです。

つまりたとえば"hoge"という文字列を返したときにSQL上で"hoge"と引用符がついた状態で 埋め込まれるのではなくhogeSQLの一部として埋め込まれるのです。

これを使えば、SQLの中に直接先程のST_GeomFromWKBを埋め込むことができます。

そこで、先程のGeometrySerializerを以下のように変更します。

lass GeometrySerializer
  def initialize
    factory = ::RGeo::Geographic.spherical_factory(srid: 4326)
    @base = ::Armg::WkbSerializer.new(factory: factory)
    @wkt_parser = ::RGeo::WKRep::WKTParser.new(
      factory, 
      default_srid: 4326)
  end

  def serialize(value)
    if value.is_a?(String)
      value = @wkt_parser.parse(value)
    end
    return value
  end
end

WKBエンコードされた文字列を返すのではなく、一旦RGeoのGeometryデータ型を そのまま返すようにしました。

そして、ArmgConnectionAdapterExtを以下のようにRGeoのデータを特別扱いするように 定義して、ソレ以外は親に任せるように定義してやります。

module ArmgConnectionAdapterExt
    def quote(value_, column_=nil)
          if ::RGeo::Feature::Geometry.check_type(value_)
            "ST_GeomFromWKB(0x#{::RGeo::WKRep::WKBGenerator.new(hex_format: true, little_endian: true).generate(value_)},#{value_.srid})"
          else
            super
          end
    end
end

このようにすることで、RGeoのデータをSQLに書き込むときにWKBの文字列形式で書き込まれるのではなく ラップされた形で書き込まれるようになります。 例えば以下のような感じです。

UPDATE `prohibited_areas` SET `geo_polygon` = ST_GeomFromWKB(0x01010000000000000000003e400000000000003e40,4326), `updated_at` = '2017-10-25 09:18:44' WHERE `prohibited_areas`.`id` = 1

これで無事にログ上にも正常にSQLが表示されるようになり、例外は発生することはなくなりました。

得られるおまけの利益

このようにRGeoのオブジェクトをSQLに安定して埋め込めるようになると 以下のようなscopeも書けるようになります。

scope :contain_latlng, ->(lat, lng) {
    factory = RGeo::Geographic.spherical_factory(srid: 4326)
    where('ST_CONTAINS(geo_polygon, ?)', factory.point(lng, lat))
}

この?の部分で先程のquoteが使われています。 なので、

ServiceArea.contain_latlng(100, 40)
  ServiceArea Load (0.4ms)  SELECT `service_areas`.* FROM `service_areas` WHERE (ST_CONTAINS(geo_polygon, ST_GeomFromWKB(0x010100000000000000000044400000000000805640,4326)))

このように、きちんと正しいSQLが安定して発行されるようになり、 自然にSQLで位置情報を扱えるようになりました。

位置情報のハッシュをS2 Geometryライブラリを利用して計算して地球上の多角形を被覆してみた

前回の記事では、S2 Geometryライブラリを使って地球上の円形を覆っていました。

poketo7878-dev.hatenablog.com

その時、「まだ多角形ポリゴンのサポートはされていない」と書いていました。 その後、そのバージョンアップが対応され、内部に穴を持たない多角形(ループと呼ばれていますs2.Loop)のセル計算がサポートされました。

なので、今回はそのあたりについて調査してみましたのでまとめます。

エンコードされたポリラインのデコード

GoogleMapではポリゴンのデータ等をエンコードして文字列に変換することができます。 この方式では直前の点からの差分をつかってエンコードするので、ある程度のサイズのポリゴンデータであっても短くエンコードすることができますので、 データベース等に格納する際はこの形式を検討してみても良いかもしれません。

そんなエンコードされたデータをデコードして、もとの緯度経度のリストに復元してくれるGoライブラリは以下のものがあります:

github.com

ループ

ループはS2ライブラリ上での「最初と最後を結んで閉じられる緯度経度のループ」を意味しています。 緯度経度の順序はループの縁に沿って反時計回り(CCW)に設定されている前提で扱われており、ループの内側がループの覆っている領域ということになります。

このループを作るには、緯度経度をまずはs2.Pointに変換し、そのポイントのリストからLoopを作る必要があります。

coords, _, _ := polyline.DecodeCoords([]byte(encodedLine))

points := make([]s2.Point, 0)
for _, coord := range coords {
    point := s2.PointFromLatLng(s2.LatLngFromDegrees(coord[0], coord[1]))
    points = append(points, point)
}
loop := s2.LoopFromPoints(points)

まずは、先程のライブラリを利用してエンコードされたポリラインをデコードし、それぞれの緯度経度からポイントを作成していますs2.PointFromLatLng そして、そのポイントをpointsにためておいて、s2.LoopFromPointsに渡してやることでs2.Loopが手に入っています。

ループと緯度経度の包含検査

f:id:Pocket7878_dev:20170805132416p:plain

例えば上のような領域をループにしていたとすると、以下のようにして、ループの中に緯度経度が含まれるかどうか判定できます。

shibuya := s2.LatLngFromDegrees(35.658034, 139.701636)
....<先程のコード等>
loop.ContainsPoint(s2.PointFromLatLng(shibuya))

shibuya変数が指しているのは渋谷駅の緯度経度なので、当然結果はtrueとなります。

ループの領域のセルによる被覆

以下のようなコードで、./polyline.txtファイルに書いたエンコードされたポリラインをloopにし、loopの領域を前回と同じようにs2.RegiconCovererを利用して セルのリストに変換します。

package main

import (
    "bufio"
    "fmt"
    "os"

    "github.com/golang/geo/s2"
    polyline "github.com/twpayne/go-polyline"
)

func readPolyline(filePath string) string {
    // ファイルを開く
    f, err := os.Open(filePath)
    if err != nil {
        fmt.Fprintf(os.Stderr, "File %s could not read: %v\n", filePath, err)
        os.Exit(1)
    }

    // 関数return時に閉じる
    defer f.Close()

    scanner := bufio.NewScanner(f)

    var line = ""
    for scanner.Scan() {
        line = scanner.Text()
    }
    if serr := scanner.Err(); serr != nil {
        fmt.Fprintf(os.Stderr, "File %s scan error: %v\n", filePath, err)
    }

    return line
}

func main() {
    encodedLine := readPolyline("./polyline.txt")
    coords, _, _ := polyline.DecodeCoords([]byte(encodedLine))

    rc := &s2.RegionCoverer{MaxLevel: 30, MaxCells: 200}
    points := make([]s2.Point, 0)
    for _, coord := range coords {
        point := s2.PointFromLatLng(s2.LatLngFromDegrees(coord[0], coord[1]))
        points = append(points, point)
    }
    loop := s2.LoopFromPoints(points)
    loopRegion := s2.Region(loop)

    for idx, c := range rc.Covering(loopRegion) {
        fmt.Printf("Cell ID [%d]: %v\n", idx, uint64(c))
    }
}

最大上限50の場合

f:id:Pocket7878_dev:20170805133623p:plain

最大上限100の場合

f:id:Pocket7878_dev:20170805134420p:plain

最大上限200の場合

f:id:Pocket7878_dev:20170805134621p:plain

ほぼ綺麗に覆われているように見えます。

前回から今の間にループのバグが修正されたので、無事多角形領域についてもセルに変換して検索することができるようになりました。

またこれらを使って得られた知見があったら共有いたします。

位置情報のハッシュをS2 Geometryライブラリを利用して計算して地球上の円を被覆してみた

背景

位置情報を扱いながら高速に検索する必要がある場合、DB上で範囲検索が容易にできると便利です。。 高速で位置情報を検索するためには、 GeoHashなどの位置情報のハッシュ計算をしておいて、ハッシュで範囲検索をするとインデックスを活用できて良いです。

今日はその位置情報のハッシュのなかでも高速な処理が求められるサービスでの導入事例の多く、 一方でドキュメントの少ないS2ライブラリについて調査したので、ざっくりまとめました。

S2

S2のライブラリはもともとはGoogleで開発されたライブラリで、 公式の情報としては解説のプレゼンがアップロードされている以外は、古びたソースコードGoogle Codeに上がっているのが分かる程度で、あまり情報はないように思えます。

アルゴリズム解説のドキュメントは、

http://blog.christianperone.com/2015/08/googles-s2-geometry-on-the-sphere-cells-and-hilbert-curve/

こちらの記事がわかりやすく、平面状をヒルベルト曲線で埋めていくことや、エンコードされたCellIDの包含判定が前方一致(厳密には末尾1ビットを削る等の処理は必要だが)でできる特徴などがわかりやすく解説されているので良いです。

S2他言語へのポート

S2のライブラリをGoに移植したものや、Pythonに移植したものが盛んに開発されているので利用すると良いと思います。

今回はGoへの移植版を利用してみます。 開発の現状としては球面幾何の世界を作るための、段階的なステップのR1, R2, R3の部分、S1の一部等は完了しているのですが、S2の部分はまだ本格的に利用するには機能が足りない部分等が多く見られますが、活発に開発はされているようなので、遠くない未来にいい感じになると思います。 PRを送ってみるのもいいかと思います。

円の被覆

S2は球面上での座標を扱ってくれるのですが、 球面上の円というのは、 実際には球の表面に沿って曲がっているのでキャップのような形になっています。 なので、球面上での円に相当する型はs2.Capという名称です(s2の部分はインポートのやり方によって変わりますが)

また、S2は地球に限らず、真球の表面を扱うものなので、球の半径を想定しておらず、基本的には 点と点の角度や、球の大円に対する比率で扱います。

たとえば、s2.CapFromCenterAngleという関数でs2.Capを作ることができるのですが、 引数は、キャップの頂点となる球面上の点と、キャップの縁の大円に対する比率で表現します。

f:id:Pocket7878_dev:20170802005315p:plain

図ですと、赤い部分が球を切ったときのCapです。ちょうど反対側の点まで行くと1で これがちょうどみかんの皮のように球の表面を完全にキャップが覆っている状態です。

なので、たとえば「渋谷駅を中心に半径4kmの円を描きたい」という風に思ったときは、 地球のサイズの球を想定して比率計算をする必要があります。 地球の円周の半分を仮に6371.01kmと置くと、比率は4 / 6371.01になります。

これを考慮してkmでCapを作れるようにするには

func capOnEarth(center s2.Point, radiusKm float64) s2.Cap {
    const earthRadiusKm = 6371.01
    ratio := (radiusKm / earthRadiusKm)
    return s2.CapFromCenterAngle(center, s1.Angle(ratio))
}

こんな風にする必要があります。

さて、次はS2の機能を利用してこの円をCellで埋めてみましょう。

s2.RegionCovererという構造体があり、これは指定された図形をCellで埋める作戦を 計算するための型です。

詳しいセルサイズの説明等はプレゼンを見ていただければ幸いですが、MaxLevel 30というところで、S2が扱える最小サイズのセルまで使ってもよいと指定し、さらにMaxCellsで被覆するにに使うセル数の最大数を指定しています。

shibuya := s2.LatLngFromDegrees(35.658034, 139.701636)
shibuyaCircle := capOnEarth(s2.PointFromLatLng(shibuya), 4)
rc := &s2.RegionCoverer{MaxLevel: 30, MaxCells: 50}
for idx, c := range rc.Covering(shibuyaCircle) {
    cell := s2.CellFromCellID(c)
    fmt.Printf("Cell [%d] vertex:\n", idx)
    for i := 0; i < 4; i++ {
        v := cell.Vertex(i)
        latLng := s2.LatLngFromPoint(v)  
        fmt.Printf("{lat: %v, lng: %v}\n", latLng.Lat, latLng.Lng)
    }
}

でどんなCellで円を埋められるのか計算できます。 この結果をGoogleMapで可視化してみたものが以下になります。

青色の円(重なっているので紫ですが)は GoogleMapの機能で渋谷駅から半径4kmを描いてみたものです。

セル上限50の場合

f:id:Pocket7878_dev:20170802005336p:plain

結構はみ出しているところが目立ちますね

セル上限100の場合

f:id:Pocket7878_dev:20170802005353p:plain

すこしはみ出しが減りました。

セル上限200の場合

f:id:Pocket7878_dev:20170802005412p:plain

かなりはみ出しが改善しました。

ユースケース

たとえば、データベース上に大量の位置情報を格納しておいて、必要に応じて高速に円の中に含まれているデータを取り出したい場合、あらからじめ位置情報をS2でCellIDに変換しておき、今回の円で得られたCellIDと前方一致で調査するという形になります。

今回は円だったのであまり有り難みが伝わりづらいのですが、 これがある地形内(ポリゴン内)に含まれている全データ等を求めたいときにデータベースの範囲インデックスを効かせて計算できるのは非常に高速に求められると思います。

残念ながらポリゴンのサポートはまだGo版のS2ではできていませんが、Python版のライブラリもありますし、MongoDBやUber、Forsquareなどのサービスでも利用されているので、位置情報を扱う何かをするときには検討してみても良いかもしれません。

まえからS2は気になっていて、今日調査してみたのでまとめてみました。さらに何か成果があれば追ってまとめます。

最小構成サーバーでは負荷に耐えられなくなってきたので、SidekiqのJob処理を別サーバーに移動した

NewAWSArchitecture.png

背景

サービスが成長しのユーザー数が増加してきたことによって、初期のサーバーの最小構成では ここ数日24時前後になるとロードアベレージが異常に上昇してしまって、 APIサーバーの応答時間が増加してしまうという問題がありました。

LoadAverageError.png

ロードアベレージがこの時最大16とかです。 CPUが1つの仮想マシンでこれは要するに全くちゃんと処理が追いついていない状況です

調査したこと

サーバーからのレスポンスが遅くロードアベレージが高いということは、 ネットワークの問題というよりは何らかのハードウェア資源が不足してるという状況が考えられます。(ポートも資源だという話はおいておいて)

CPU負荷の確認

幸いCloudWatchのダッシュボードでCPUのUsageを監視していたので、すぐに見られたのですが このときCPU負荷は常に90%以上を推移していてCPUが何らか圧迫されているように見えました。

SARをみた

sysstatというパッケージを入れておくと定期的にCPUが何に取られているかログを取ってくれるで便利です。sar -P ALL | headしてみたら、%sysや%iowaitは低かったのですが、%userが40%近い値になっていました。これはつまり何らかユーザープロセスがCPUを圧迫しているということです。

topでざっくりと負荷を確認

topコマンドは常識だと思いますが、topでしばらくCPUの負荷を観察したところ、定期的にbundleコマンドの何かが3つほど呼び出されてそれら個別に20%ずつほどCPUを食っているようでした。

psでしっかりと確認

ps aux --sort -pcpu | head -n4で、全プロセスをCPU処理を食っている割合で降順にソートし、トップ3を取り出してみました。 すると、サーバー側でwheneverというgem経由で登録されている 定期的にユーザー端末のヘルスチェックを行ったり、定期的なログを吐き出すようなジョブたちがCPUの大部分を占めていました。

このあと他のプロセスも見てみたのですが、 他のプロセスは基本的にCPU負荷は5%にも届かないような状態が多く、圧倒的にこれらのプロセスがCPUを食っていました。

これらの状況から、 以前から気がかりだった定期的な処理とAPIサーバーが同じところに乗っているという問題は早急に解決しないと行けないレベルになったと判断しました。

やったこと

Job処理をバッチサーバーに移動

とりあえずAPIサーバーからこれらの処理を追い出したかったので、ActiveJobを実行しているSidekiqAPIサーバーから追い出そうと決め、とりあえずGoのスクリプト等がcronで呼び出されていたサーバー(以下バッチサーバー)に移動しようと決めました。

ソースコードをバッチサーバにもデプロイ

デプロイ処理を現在はCapistranoで行っていたこともあり、rbenvの登録や必要なライブラリのインストール等を適宜バッチサーバーに実施してやり、デプロイ対象にバッチサーバーを登録してやったところすんなりCapistranoソースコードをバッチサーバーにデプロイできました。

具体的には、capistranoの環境ごとの設定ファイルconfig/deploy/production.rb等に新たなroleとして

role :batch, %w(<サーバー接続情報>)

を登録してやりました。これで本番環境のデプロイ時にバッチサーバーにもソースコードがアップロードされるようになりました。

Wheneverによるcrontabの登録をバッチサーバーのみにする

こちらもwhenever_roleという値を set :whenever_roles, :batchという感じで設定してやればwheneverのcron登録は先程のbatchロールにだけ実施されるようなりました。

気づき: どのロールに何を割り当てるかはdeploy.rbでまとめておくと楽

上記の話はプロダクション環境のみに関係のある話でステージング環境は以前すべて同じサーバーで問題ありません。なので、config/deploy.rbset :whenever_roles, :batchを書いておいてやって、具体的なbatchロールに属するサーバーはproduction.rbstaging.rbで設定する形にしておくと楽です。staging.rbではたとえばメインのサーバーと同一の接続情報にしておくことで同じサーバーですべて実施されます。

Redisをバッチサーバに移動

SidekiqではJobのバックエンドのキューはRedisが利用されているので、 ジョブを正しくキューするにはRedisもどこかに移さなくてはなりません。 一旦APIサーバーはAPIのみに注力しようと考え、暫定的にバッチサーバーにRedisも移動することにしました。 (本来はどこか専用のインスタンスに置くべきだと思います)

バッチサーバー上にRedisをインストールして起動して、 適宜ポートの開放やAWS上でのセキュリティーグループの設定等を済まして、 redis接続可能になったらOKです。 このとき、APIサーバーからもRedisにつなげておくようにしておくと、Sidekiqに付属しているWebコンソール経由で見張れるので便利です。

Sidekiq.configure_server do |config|
    if Rails.env.production?
        config.redis = { url: 'redis://<redisのホスト>:<redisのポート>', namespace: 'sidekiq' }
    else
        config.redis = { url: 'redis://127.0.0.1:6379', namespace: 'sidekiq' }
    end
end

Sidekiq.configure_client do |config|
    if Rails.env.production?
        config.redis = { url: 'redis://<redisのホスト>:<redisのポート>', namespace: 'sidekiq' }
    else
        config.redis = { url: 'redis://127.0.0.1:6379', namespace: 'sidekiq' }
    end
end
.....
mount Sidekiq::Web, at: "<どこか適切なパス>"

この段階で、cron経由やAPIサーバー経由でジョブがバッチサーバー上のredisに登録される様になりました。 Jobの実行自体はSidekiqが実行されているAPIサーバー上で実施されているので、 最後にSidekiqもバッチサーバー上に追い出す必要があります. (こちらも本来はSidekiq用の専用サーバーを立ててやってAutoScalingとかするのが良いと思います)

Sidekiqをバッチサーバー上でのみ実施する

こちらもCapistranoのお陰で超簡単です。

...
set :sidekiq_role, :sidekiq

sidekiq_roleを設定してやって、wheneverの時と同様に各環境ごとにサーバーを設定してやれば完了です。これで、デプロイ時にバッチサーバー上でSidekiqが実行されるようになります。

動作を確認しながら、段階的にAPIサーバー上でのcronを消し、sidekiqもkill -USR1 <pid>で動きを止めて様子を見ながらkill -TERM <pid>で止めて、Webの監視画面からsidekiqが正常に動いていることを確認したら無事完了です。

Fluentdをバッチサーバーにインストー

Jobの中にはFluentd経由でログを収集しているインスタンス向けてRails内部からログを送信するタイプのものもありました。バッチサーバー上のsidekiqのログを見ていたらFluentdとの接続エラーが出ていたので、適宜Fluentdの設定をしました。これはAPIサーバー上にFluentdを設定したときのItamaeのレシピが残っていたので、それを適宜再利用してすんなり完了しました。

バッチサーバーのインスタンスタイプを変更

バッチサーバーは本当に1日に一回程度Goのちょっとしたスクリプトが動くだけだったのでt2.microで運営していても問題がなかったのですが、 今回の移行で負荷が増大する懸念があったので、インスタンスタイプの変更を検討しました。

本番サーバー上でのメモリ使用率はそれなりに80%程度と高かったのですが、ps aux --sort -rssの結果を確認したところこれらはunicornのワーカーが使用していもので、バッチジョブとは無関係でした。一方でCPUの消費はほぼバッチによるものでした。

また、バッチの定期的なものだけでなく、オンラインで実施されるJobも含めて検討するとサービスの実施時間帯中のスポットで一気にユーザーの需要に合わせて高まる傾向が多く、ずっとCPUを消費するというよりは必要なときにパワーの出るタイプが望ましいと判断しました。

また、サービス拡大の時期であり、需要の上限が用意には見積もれず、またオートスケールを導入してしまうよりは一旦サービス中の負荷を確認できたほうが良いだろうと判断し、一旦大きめのインスタンスタイプで余裕を持つことにし、インスタンスタイプはt2.mediumを選択しました。

どうなった

CloudWatchで見張った

その晩は果たして上記の判断が正しいものだったのか不安だったので、CloudWatchでAPIサーバー及びバッチサーバーを見張ることにしました。

実施直後ロードアベレージが一気に下がった

まず、上記の施策を実施した時点でロードアベレージががくんと下がりました

スクリーンショット 2017-07-30 15.25.46.png

施策の実施直前は色々とサーバー内で必要なプロセスを止めたり、色々と作業をしていたので一時的にロードアベレージは上がっていますが、実施後に、一気に平均のロードアベレージが下がりました。負荷の高い時間帯でなくても平均してロードアベレージ0.4程度あったのですが、施策を実施した直後0.04程度に一気に下がりました。

サービス時間帯中もロードアベレージが異常事態を迎えなくなった

スクリーンショット 2017-07-30 15.33.29.png

実施して一晩たった記録ですが、最近発生してきていた異常な状態がなくなったことがわかります。やはりサービス時間野メイン時間には多少ロードアベレージが上がっていますが、一番高いもので1.99です。

レスポンスタイムも上がらなくなった

スクリーンショット 2017-07-30 15.35.33.png

ここしばらくでユーザー数が一気に伸びたこともあり、 2日連続で通信エラーに悩まされていて眠れない夜が続いていたのですが、今回の施策を実施して、一気に異常なピークが発生しなくなりました。

バッチサーバーはちょうどよい?

バッチサーバーはCloudWatchで見たところCPU負荷はピーク時にCloudWatch上で15%程度平常時には12%程度で、 メモリ使用率は30%程度です。t2.microはコアが2つなので、実際は24%~30%程度ですので、ベースラインパフォーマンスの40%は常に下回っている状態となりました。 参考リンク - AWS T2インスタンス

T2インスタンスはベースラインパフォーマンスを下回っていると自動的にCPUクレジットが溜まっていき、必要なときに頑張ってくれるのですが、現状のインスタンスタイプだとピーク時にもクレジットはたまり続けて居ますね。とはいえ、まだ一日しか様子は見ていないので、ここで小さくするのは早計だと思うので、一週間から一ヶ月程度様子を見た上で判断したいと思います。

やりたいこと

デプロイをDocker化

開発環境はDockerでビルドできているので、本番環境でもDockerに移行してECS等で管理したい。 Dockerイメージにするとエントリポイント等から柔軟に何を実行するか調節できそうで、オートスケール等でも 用意に対応できると思う。

Fastlane gemのSpaceshipを使ってiOSアプリのバージョン番号のリストを取得する

APIサーバーを書いていてクライアント側に非互換な変更が入ったときに、 サーバー側から「アプリに新しいバージョンが出てるから必ずバージョンアップしてから使ってね」という 形の強制アップデートを要求したいケースがあります。

セマンティックバージョニング

そういうときに、アプリのバージョンを大小比較するにあたっては、

セマンティック バージョニング 2.0.0 | Semantic Versioning

セマンティックバージョニングを採用しておくと大小比較ができるので便利ですし、 バージョンをどうするか悩まなくてもどういう変更の入ったリリースかからある程度自動的に決まるので考えなくて済みます。

アプリからはサーバーサイドへのリクエスト時に必ずアプリケーションのバージョンをヘッダーにつけさせるようにし、 サーバーサイドではそのバージョンをパースして最小バージョンでなかったら適宜バージョンアップデートを促す専用の レスポンスを返すようにします。

rubyだと以下のgemがセマンティックバージョンの処理をしてくれるので便利でした。

github.com

SpaceshipでiTunes Connectのデータを取得

SpaceshipというのはFastlaneというアプリのリリース自動化を手伝ってくれるツール兼Gemの中に含まれるライブラリなのですが、

github.com

この機能として、iTunes Connectから情報を引っ張ってくる機能があります。

require 'fastlane'

Spaceship::Tunes.login("<itunes connect email>", "<itunes connect password>")
app = Spaceship::Tunes::Application.find("<bundle id>")
version_strings = app.versions_history.map{|h| h.version_string}

これでversion_stringsのなかに["1.0.0", "1.0.1", "1.1.0", "2.0.0"]のようなバージョン番号の配列が入ります。 これを適宜サーバーサイドに設置した管理ページなどから選択してやって、それが最低バージョンの保証のコードに反映されるようにすると、 動的に最低バージョンの保証を変更することができるようになります。

手作業でバージョン番号を文字列で入れるようにしてしまうと、 打ち間違いやら、あり得ないバージョンが選択されてしまったりするので、 確実に存在するバージョンを選べるようにしておくと安心です。

またapp.edit_version.versionとやると、「リリース待ち(作ってから、ストアに出ていないバージョン)のバージョン」が取得でき app.live_version.versionとやると、ストアに出ている最新バージョンが取得できます。 適宜、上記で取得したバージョンにフィルタリングしたり選ぶときの候補名の装飾に使うと便利だと思います。

ニコ生のTSをダウンロードするスクリプトをRustで書いた

ニコ生のTSをダウンロードするツールというのがネットに何個か見つかって、画質選択ができたり、複数に分割されているものに 対応しているものがすぐには見つからなかったので勉強がてら書いてみました。

extern crate reqwest;
extern crate sxd_document;
extern crate sxd_xpath;
extern crate cookie;

use std::io::Read;
use reqwest::header::{SetCookie, Cookie};
use sxd_document::parser;
use sxd_xpath::{evaluate_xpath, Value};
use std::collections::HashMap;

fn gets(prompt: &str) -> String {
    println!("{}",prompt);
    let mut input_buf = String::new();
    std::io::stdin()
        .read_line(&mut input_buf)
        .expect("failed to read from stdin");
    return input_buf
}

/*
fn get_nico_live_server_time() -> String {
    let mut server_time_result = reqwest::get("http://live.nicovideo.jp/api/getservertime").unwrap();
    if !(server_time_result.status().is_success()) {
        panic!("Failed to get server time from nico live.");
    }

    let mut server_time_content = String::new();
    server_time_result.read_to_string(&mut server_time_content).unwrap();

    return server_time_content.trim_left_matches("servertime=").to_owned();
}

fn get_ticket(email: &str, pass: &str, server_time: &str) -> String {
    let params = [("mail", email), ("password", pass), ("site", "nicolive_encoder"), ("time", server_time)];
    let client = reqwest::Client::new().unwrap();
    let mut login_result = client.post("https://account.nicovideo.jp/api/v1/login").unwrap()
        .header(UserAgent::new("nicoliveenc/2.0.7"))
        .form(&params).unwrap()
        .send().unwrap();
    let mut login_content = String::new();
    login_result.read_to_string(&mut login_content).unwrap();

    // Parse login_content
    let login_xml = parser::parse(&login_content).expect("failed to parse XML");
    let login_document = login_xml.as_document();

    let ticket_value = evaluate_xpath(&login_document, "/nicovideo_user_response/ticket").expect("Retrieve ticket from login_xml failed.");
    return ticket_value.string();
}
*/

fn get_lv_status(email: &str, pass: &str, lv_num: &str) -> String {
    use std::time::Duration;
    use reqwest::RedirectPolicy;

    let params = [("mail", email), ("password", pass)];
    let client: reqwest::Client = reqwest::Client::builder().unwrap()
        .gzip(true)
        .redirect(RedirectPolicy::none())
        .timeout(Duration::from_secs(10))
        .build().unwrap();
    let login_result = client.post("https://account.nicovideo.jp/api/v1/login").unwrap()
        .form(&params).unwrap()
        .send().unwrap();

    let set_cookies = login_result.headers().iter()
        .filter_map(|header| {
            if header.is::<SetCookie>() {
                header.value::<SetCookie>()
            } else {
                None
            }
        })
        .next();
    let mut new_cookie: Cookie = Cookie::new();
    match set_cookies {
        Some(v) => {
            for cookie in v.iter() {
                let c = cookie::Cookie::parse(cookie.clone()).unwrap();
                new_cookie.set(c.name().to_owned(), c.value().to_owned());
            }
        },
        None => {
        }
    }

    let mut lv_status_result = client.get(&format!("http://live.nicovideo.jp/api/getplayerstatus/lv{}", lv_num)).unwrap()
        .header(new_cookie)
        .send().unwrap();

    let mut lv_status_content = String::new();
    lv_status_result.read_to_string(&mut lv_status_content).unwrap();

    return lv_status_content
}

fn get_queues(doc: &sxd_document::dom::Document) -> HashMap<String, Vec<String>> {
    use Value::*;
    let queues = evaluate_xpath(&doc, "/getplayerstatus/stream/quesheet/que").expect("Retrieve queue from lv_status failed.");
    let publish_list = match queues {
        String(ref val) => {
            vec![val.clone()]
        },
        Nodeset(ref ns) => {
            ns.document_order().iter().map(|n| n.string_value().clone()).filter(|s| s.starts_with("/publish")).collect::<Vec<_>>()
        },
        _ => {
            vec![]
        }
    };

    let mut queue_data: HashMap<std::string::String, Vec<std::string::String>> = HashMap::new();
    for publish in publish_list.iter() {
       let publish_data = publish.split(' ').collect::<Vec<_>>();
       let key = publish_data[1].to_owned();
       let value = publish_data[2].to_owned();
       if queue_data.contains_key(&key) {
           let mut old_vectors = queue_data.get_mut(&key).unwrap();
           old_vectors.push(value);
       } else {
           queue_data.insert(key, vec![value]);
       }
    }
    return queue_data
}

fn get_play_list(doc: &sxd_document::dom::Document) -> Vec<(String, String)> {
    use Value::*;
    let queues = evaluate_xpath(&doc, "/getplayerstatus/stream/quesheet/que").expect("Retrieve queue from lv_status failed.");
    match queues {
        Nodeset(ref ns) => {
            match ns.document_order().iter().map(|n| n.string_value().clone()).find(|s| s.starts_with("/play")) {
                Some(play_line) => {
                    let play_table_line = play_line.split(' ').collect::<Vec<_>>()[1];
                    let play_entries: Vec<&str> = play_table_line.trim_left_matches("case:").split(',').collect::<Vec<_>>();
                    let mut play_list = vec![];
                    for play_entry in play_entries.iter() {
                        let play = play_entry.split(':').collect::<Vec<_>>();
                        play_list.push((play[0].to_owned(), play[2].to_owned()));
                    }
                    return play_list
                },
                None => {
                    return vec![]
                }
            }
        },
        _ => {
            return vec![]
        }
    }
}

fn get_rtmp_urls(doc: &sxd_document::dom::Document) -> Vec<String> {
    use Value::*;

    let rtmp_urls = evaluate_xpath(&doc, "/getplayerstatus/rtmp/url").expect("Retrieve rtmp urls from lv_status failed.");
    match rtmp_urls {
        String(ref val) => {
            return vec![val.clone()]
        },
        Nodeset(ref ns) => {
            ns.document_order().iter().map(|n| n.string_value().clone()).collect::<Vec<_>>()
        },
        _ => {
            return vec![]
        }
    }
}

fn get_rtmp_tickets(doc: &sxd_document::dom::Document) -> Vec<String> {
    use Value::*;

    let rtmp_tickets = evaluate_xpath(&doc, "/getplayerstatus/rtmp/ticket").expect("Retrieve rtmp tickets from lv_status failed.");
    match rtmp_tickets {
        String(ref val) => {
            return vec![val.clone()]
        },
        Nodeset(ref ns) => {
            ns.document_order().iter().map(|n| n.string_value().clone()).collect::<Vec<_>>()
        },
        _ => {
            return vec![]
        }
    }
}

fn read_number(min: u32, max: u32) -> u32 {
    loop {
    let mut input_text = String::new();
    std::io::stdin()
        .read_line(&mut input_text)
        .expect("failed to read from stdin");

    let trimmed = input_text.trim();
    match trimmed.parse::<u32>() {
        Ok(i) if (i >= min && i <= max) => {
            return i
        }
        _ => { continue }
    };
    }
}

fn main() {
    use std::process::Command;
    let email = gets("Input email");
    let pass = gets("Input password");
    let lv_num = gets("Input lv number");

    let lv_status = get_lv_status(&email.trim(), &pass.trim(), &lv_num.trim());
    let lv_status_xml = parser::parse(&lv_status).expect("failed to parse XML");
    let lv_status_doc = lv_status_xml.as_document();

    let play_list = get_play_list(&lv_status_doc);
    let queues = get_queues(&lv_status_doc);
    let rtmp_urls = get_rtmp_urls(&lv_status_doc);
    let rtmp_tickets = get_rtmp_tickets(&lv_status_doc);

    //Ask user to choose queue
    let mut index = 1;
    for play_entry in play_list.iter() {
        println!("{}: {}", index, play_entry.0);
        index += 1;
    }
    let play_index = read_number(1, play_list.len() as u32);
    let file_name = gets("Input file name");
    let selected_play = &play_list[(play_index - 1) as usize].1;
    match queues.get(selected_play) {
        Some(qs) => {
            for (part, q) in qs.iter().enumerate() {
                let mut child = Command::new("rtmpdump")
                                    .arg("-r")
                                    .arg(rtmp_urls[0].clone())
                                    .arg("-y")
                                    .arg(&format!("mp4:{}", q))
                                    .arg("-C")
                                    .arg(&format!("S:{}", rtmp_tickets[0].clone()))
                                    .arg("-e")
                                    .arg("-o")
                                    .arg(&format!("{}_part{}.flv", file_name.trim().clone(), part + 1))
                                    .spawn()
                                    .expect("failed to execute process");
                let status = child.wait().unwrap();
                println!("{}", status);
            }
        },
        None => {
        }
    }
}

実行すると、メールアドレスやらパスワードやらとlvhogehogehogehoge部分を入力を促される。 入力するとデータ解析して、ストリームの画質やらを

1: sp
2: mobile
3: premium
4: default

こんな感じで聞かれるので、いい感じに数値入力して選択するとflvが幾つかのパートに分割されてダウンロードされる。

シェルスクリプト系でやろうとするとプロンプト出したりするの面倒そうだったしRustの勉強にもなるかなと思ってやってみました。 一応Githubにあげてあります。

github.com

GoogleMapのスタイルギャラリーSnazzyMapsのスタイルを適用したGoogle Static Mapの作成

GoogleMapのStaticMapを作るときに、スタイルをカスタマイズしてアプリケーションのテーマと雰囲気を合わせたいなという場合があります。 GoogleMapのスタイルはカスタマイズできるのですが、個別の項目を1からカスタマイズするよりは、 おしゃれな既存のテーマを見つけて、そこから微調整するほうがらくですよね。

そんな既存のテーマが大量にアップロードされている

snazzymaps.com

というありがたいページがあります。 このサイトはユーザーがカスタマイズしたスタイルを投稿してくれていて、ライセンスCCで公開されており、自由に使うことができます。

このサイトで、たとえば

Apple Maps-esque - Snazzy Maps - Free Styles for Google Maps

のようなスタイルページを開くとJS用のスタイルコードをJSONでダウンロードできるのですが、 このJS用のスタイルをStaticMapで利用するためには少し変換してやる必要があります。

具体的にはfeature:<featureType>|element:<elementType>|<StylersのKey Valueを:で繋いだもの>という形式に変換して それぞれ個別にstyleパラメータとして指定してやる必要があります。 また、カラーコードは#で始まる形式ではなく、0xから始める形式に変換してやる必要があります。

なので、SnazzyのページのURIを指定すると、そこからHTMLを解析してスタイルのJSONを取得して、 上記の変換をした上でStaticMapのURLを生成してくれるRubyスクリプトを書いてみました。

require 'json'
require 'open-uri'
require 'nokogiri'

SNAZZY_URL = '<Snazzy style page url>' # 'https://snazzymaps.com/style/42/apple-maps-esque'
GOOGLE_MAP_API_KEY = "<Google Map Api Key>"

if __FILE__ == $0
    baseURL = "https://maps.googleapis.com/maps/api/staticmap"
    # Default Parameters
    parameters = [
        {"size": "400x400"},
        {"scale": 2},
        {"maptype": "roadmap"},
        {"key": GOOGLE_MAP_API_KEY}
    ]

    # Retrieve Style from Snazzy
    charset = nil
    html = open(SNAZZY_URL) do |f|
      charset = f.charset
      f.read
    end
    page = Nokogiri::HTML.parse(html, nil, charset)
    styleJSONStr = page.search('pre#style-json').text
    styleJSON = JSON.parse(styleJSONStr)
    styleJSON.each do |entry|
      style = []
      featureType = entry["featureType"]
      if featureType
        style.push("feature:#{featureType}")
      end
      elementType = entry["elementType"]
      if elementType
        style.push("element:#{elementType}")
      end
      entry["stylers"].map { |styleEntry|
        k, v = styleEntry.first
        if k == "color"
          v = "0x#{v[1..-1]}"
        end
        style.push("#{k}:#{v}")
      }
      parameters.push({"style": style.join("|")})
    end

    # Markers
    src = "label:S|color:blue|渋谷駅"
    dst = "label:G|color:red|六本木一丁目駅"
    parameters.push({"markers": "#{src}"})
    parameters.push({"markers": "#{dst}"})

    # Add Polyline
    parameters.push({"path": "color:0x5BB2FFFF|weight:4|enc:omsxEystsYMo@Kw@GgACa@??EqBGo@GaAC_@Ek@KmG?iBCwAIwEUgFGwAWsFSuEGeBGmBKgBI}BAOCo@IeBG{@OwDWmGOkDGkAKeA??KkGIsC??CeABwAIsAYuCS}AKo@]{AmCcLm@{Ba@qAc@uAw@qB_@aAQc@s@iBwBiGyA}D_E}JYu@qAyCkBqE??k@wAo@yAWo@Mk@G[EUAW?[@SBODQDK"})

    # Generate QueryParameter
    queryParameter = parameters.map {|entry|
      k,v = entry.first
      "#{k}=#{v}"
    }.join("&")

    # Generate & Open URL
    url = "#{baseURL}?#{queryParameter}"
    puts url
    system "open '#{url}'"
end

このスクリプトを実行すると以下の画像のようなStaticMapへのリンクが生成されます。

f:id:Pocket7878_dev:20170725135121p:plain

無事ギャラリーのスタイルが適用されたStaticMapを取得することができました。

僕が働いているAzit.incでは一緒に働けるエンジニアを募集しています!
採用情報 — 株式会社アジット|Azit Inc.