Skip to main content

Recommendation Feature View

In this section, we present how to create a simple feature view for recommendation purposes.

note

To understand this section, you must be familiar with:

Define The Problem

We aim to create a simple recommendation system that suggests tokens to a user based on the tokens bought or sold by similar addresses. We use cosine similarity to find similar addresses and then identify which tokens each address interacts with.

Steps

We follow these steps to solve the problem:

  • Create the Pipeline: First, we create a pipeline using transformers like derivers to find the cosine similarities dataset and extract features for each address and token.
  • Save the Result: Afterward, we save the result into the database for inference.
  • Realtime Inference: We will also discuss how to handle inference using the SDK in the real-time inference documentation.

Transactions Data

First, we need to understand what the transactions data looks like. This data shows the fraction of transactions in Ethereum, and you can fetch this data using the Flipside source with the ethereum.core.ez_token_transfers table.

The data looks like this:

from_addressto_addresscontract_addressamount
address_1address_20xdAC17F958D2ee523a2206206994597C13D831ec710000

Note that the data has other columns as well, but for now, we will focus on the columns we need. We use the feature trimmer to keep only the necessary columns.

Design the Pipeline

Assuming we fetch the transactions data, we want to design the pipeline.

Transformers

  • Feature Trimmer: First, we trim the features that we don't need. We need only these columns: "from_address", " to_address", "contract_address", "amount". We use this trimmer:

    FeatureTrimmer(
    columns=[
    configs.CONTRACT_ADDRESS_COL,
    configs.FROM_ADDRESS_COL,
    configs.TO_ADDRESS_COL,
    configs.AMOUNT_PRICE_COL,
    "ez_token_transfers_id",
    ]
    )
  • Low Transaction Trimmer: We are interested in addresses with a minimum number of transactions. We don't want to consider an address with just one transaction. For this, we use the low transaction trimmer:

    LowTransactionTrimmer(min_transaction_num=None, min_quantile=0.985)
  • Popular Contract Address: In the dataset, there may be many contract addresses, but we are only interested in the popular ones. We keep only the rows with these popular contract addresses using the contract trimmer:

    from seshat.utils.contracts import PopularContractsFinder

    ContractTrimmer(PopularContractsFinder().find, contract_list_kwargs={"limit": 200}),

    You can change the limit based on your needs. Here, we calculate the top 200 popular tokens.

  • Address DataFrame: As mentioned, we need to find features for each address. So, we separate all addresses into a new dataframe called address. This task is done by the sframe from cols deriver:

    SFrameFromColsDeriver(result_col="address")
  • Features for Address: We can derive various features for addresses. For now, we derive the count of sent and received transactions for each address. To find these features, we use the feature for address deriver:

    FeatureForAddressDeriver(
    value_col="ez_token_transfers_id",
    result_col="sent_count",
    default_index_col="from_address",
    agg_func="nunique",
    is_numeric=False,
    )
    FeatureForAddressDeriver(
    value_col="ez_token_transfers_id",
    result_col="received_count",
    default_index_col="to_address",
    agg_func="nunique",
    is_numeric=False,
    )
  • Tokens DataFrame: Similar to the address dataframe, we want to have some features for the tokens. So, we separate all the tokens into a new dataframe called tokens.

    SFrameFromColsDeriver(
    group_keys={"default": "default", "address": "tokens"},
    result_col="token",
    cols=("contract_address",),
    )
  • Features for Tokens: Like addresses, we can find several features for tokens. Here, we derive two simple features: the unique sender and receiver count.

    FeatureForAddressDeriver(
    group_keys={"default": "default", "address": "tokens"},
    value_col="from_address",
    result_col="sender_count",
    default_index_col="contract_address",
    address_index_col="token",
    agg_func="nunique",
    is_numeric=False,
    )
    FeatureForAddressDeriver(
    group_keys={"default": "default", "address": "tokens"},
    value_col="to_address",
    result_col="receiver_count",
    default_index_col="contract_address",
    address_index_col="token",
    agg_func="nunique",
    is_numeric=False,
    )
  • Address-Token DataFrame: We need to know which tokens each address has interacted with. So, we create another separate dataframe called address-token. To derive this, we first separate addresses into a new dataframe and then use the merger to merge the contract addresses that each address has interacted with into the address-token dataframe.

    Steps:

    • Separate addresses into a new dataframe:

      SFrameFromColsDeriver(
      group_keys={"default": "default", "address": "address-token"},
      result_col="address",
      )
    • Find the tokens that each address has sold:

      Merger(
      group_keys={"default": "address-token", "other": "default"},
      left_on="address",
      right_on="from_address",
      right_schema=Schema(
      cols=[Col("contract_address"), Col("from_address")]
      ),
      axis=1,
      inplace=True,
      merge_how="inner",
      )
    • Find the tokens that each address has bought:

      Merger(
      group_keys={"default": "address-token", "other": "default"},
      right_schema=Schema(
      cols=[
      Col("contract_address"),
      Col("symbol"),
      Col("to_address", "address"),
      ]
      ),
      axis=0,
      inplace=True,
      )
    • Remove duplications, as an address may have both sold and bought the same contract address:

      DuplicateTrimmer(
      group_keys={"default": "address-token"},
      subset=["address", "symbol"],
      )
  • Features for Address-Token: Now that we have the address-token dataframe, we need to find features for them. We compute the count of sending and receiving transactions that each address has with its tokens:

    FeatureForAddressDeriver(
    group_keys={"default": "default", "address": "address-token"},
    value_col="ez_token_transfers_id",
    result_col="sent_count",
    default_index_col=["from_address", "contract_address"],
    address_index_col=["address", "contract_address"],
    agg_func="count",
    is_numeric=False,
    )
    FeatureForAddressDeriver(
    group_keys={"default": "default", "address": "address-token"},
    value_col="ez_token_transfers_id",
    result_col="received_count",
    default_index_col=["to_address", "contract_address"],
    address_index_col=["address", "contract_address"],
    agg_func="count",
    is_numeric=False,
    )
  • Cosine Similarity: Finally, we calculate the cosine similarity for the addresses to identify similar addresses. The cosine similarity needs a vector, we use the token pivot vectorizer and use the count strategy that is calculated by default by the cosine similarity vectorizer if no vector is provided for it.

    To avoid that cosine similarity result be very large, we for each address just keep the top 50 similars.

    CosineSimilarityVectorizer(
    group_keys={
    "default": "default",
    "cosine_sim": "cosine_sim",
    "vector": "vector",
    },
    square_shape=False,
    threshold="by_count",
    threshold_value=50,
    )

Define the Pipeline

After we have all of the transformers, we create the new pipeline and place all the above transformers into it.


pipeline = Pipeline

(
pipes=[
FeatureTrimmer(
columns=[
configs.CONTRACT_ADDRESS_COL,
configs.FROM_ADDRESS_COL,
configs.TO_ADDRESS_COL,
configs.AMOUNT_PRICE_COL,
"ez_token_transfers_id",
]
),
LowTransactionTrimmer(min_transaction_num=None, min_quantile=0.985),
ContractTrimmer(
PopularContractsFinder().find, contract_list_kwargs={"limit": 200}
),
SFrameFromColsDeriver(result_col="address"),
FeatureForAddressDeriver(
value_col="ez_token_transfers_id",
result_col="sent_count",
default_index_col="from_address",
agg_func="nunique",
is_numeric=False,
),
FeatureForAddressDeriver(
value_col="ez_token_transfers_id",
result_col="received_count",
default_index_col="to_address",
agg_func="nunique",
is_numeric=False,
),
SFrameFromColsDeriver(
group_keys={"default": "default", "address": "tokens"},
result_col="token",
cols=("contract_address",),
),
FeatureForAddressDeriver(
group_keys={"default": "default", "address": "tokens"},
value_col="from_address",
result_col="sender_count",
default_index_col="contract_address",
address_index_col="token",
agg_func="nunique",
is_numeric=False,
),
FeatureForAddressDeriver(
group_keys={"default": "default", "address": "tokens"},
value_col="to_address",
result_col="receiver_count",
default_index_col="contract_address",
address_index_col="token",
agg_func="nunique",
is_numeric=False,
),
SFrameFromColsDeriver(
group_keys={"default": "default", "address": "address-token"},
result_col="address",
),
Merger(
group_keys={"default": "address-token", "other": "default"},
left_on="address",
right_on="from_address",
right_schema=Schema(cols=[Col("contract_address"), Col("from_address")]),
axis=1,
inplace=True,
merge_how="inner",
),
Merger(
group_keys={"default": "address-token", "other": "default"},
right_schema=Schema(
cols=[
Col("contract_address"),
Col("symbol"),
Col("to_address", "address"),
]
),
axis=0,
inplace=True,
),
DuplicateTrimmer(
group_keys={"default": "address-token"},
subset=["address", "symbol"],
),
FeatureForAddressDeriver(
group_keys={"default": "default", "address": "address-token"},
value_col="ez_token_transfers_id",
result_col="sent_count",
default_index_col=["from_address", "contract_address"],
address_index_col=["address", "contract_address"],
agg_func="count",
is_numeric=False,
),
FeatureForAddressDeriver(
group_keys={"default": "default", "address": "address-token"},
value_col="ez_token_transfers_id",
result_col="received_count",
default_index_col=["to_address", "contract_address"],
address_index_col=["address", "contract_address"],
agg_func="count",
is_numeric=False,
),
CosineSimilarityVectorizer(
group_keys={
"default": "default",
"cosine_sim": "cosine_sim",
"vector": "vector",
},
square_shape=False,
threshold="by_count",
threshold_value=50,
),
]
)

Define the Source

After designing the pipeline, we want to fetch the data using the Flipside source for it.

FlipSideSource(
api_key=API_KEY,
filters={
"BLOCK_NUMBER": {"val": 19710819, "op": ">="},
},
)

Define the Saver

We want to save the result into the database.

Configs

  • For the tokens sf that show the feature for each token, we use this save config.

    SaveConfig(
    sf_key="tokens",
    table="token_info",
    strategy="replace",
    indexes=[["address"], ["symbol"], ["address", "symbol"]],
    schema=Schema(
    cols=[
    Col("token", to="address", is_id=True),
    Col("symbol", update_func="replace"),
    Col("sender_count", dtype="Double", update_func="sum"),
    Col("receiver_count", dtype="Double", update_func="sum"),
    ],
    ),
    )

    Note that we add 3 indexes for this table. These indexes are:

    • single index on address column
    • single index on symbol column
    • composite index on the address and symbol columns
  • For the address-token sf that shows the token feature for each address:

    SaveConfig(
    sf_key="address-token",
    table="top_token",
    strategy="replace",
    schema=Schema(
    cols=[
    Col("address", is_id=True),
    Col("contract_address", "token", is_id=True),
    Col("sent_count", dtype="Double", update_func="replace"),
    Col("received_count", dtype="Double", update_func="replace"),
    ],
    ),
    indexes=["address", "token"],
    )
  • For the address sf that contains the feature of each address:

    SaveConfig(
    sf_key="address",
    table="address_info",
    schema=Schema(
    cols=[
    Col("address"),
    Col("sent_count", dtype="Double"),
    Col("received_count", dtype="Double"),
    ]
    ),
    indexes=["address"],
    )
  • And finally for the cosine_sim sf:

    SaveConfig(
    sf_key="cosine_sim",
    table="cosine_sim",
    schema=Schema(
    cols=[
    Col(original="address_1", dtype="String"),
    Col(original="address_2", dtype="String"),
    Col(original="cosine", dtype="Float"),
    ]
    ),
    indexes=["address_1", "address_2", ["address_1", "address_2"]],
    )

Saver

After that, we should add all configs into the saver:

from seshat.source.saver import Saver, SQLDBSaver
from seshat.source.saver.base import SaveConfig
from seshat.transformer.schema import Schema, Col

saver = SQLDBSaver(
save_configs=[
SaveConfig(
sf_key="tokens",
table="token_info",
strategy="replace",
indexes=[["address"], ["symbol"], ["address", "symbol"]],
schema=Schema(
cols=[
Col("token", to="address", is_id=True),
Col("symbol", update_func="replace"),
Col("sender_count", dtype="Double", update_func="sum"),
Col("receiver_count", dtype="Double", update_func="sum"),
],
),
),
SaveConfig(
sf_key="address-token",
table="top_token",
strategy="replace",
schema=Schema(
cols=[
Col("address", is_id=True),
Col("contract_address", "token", is_id=True),
Col("sent_count", dtype="Double", update_func="replace"),
Col("received_count", dtype="Double", update_func="replace"),
],
),
indexes=["address", "token"],
),
SaveConfig(
sf_key="address",
table="address_info",
schema=Schema(
cols=[
Col("address"),
Col("sent_count", dtype="Double"),
Col("received_count", dtype="Double"),
]
),
indexes=["address"],
),
SaveConfig(
sf_key="cosine_sim",
table="cosine_sim",
schema=Schema(
cols=[
Col(original="address_1", dtype="String"),
Col(original="address_2", dtype="String"),
Col(original="cosine", dtype="Float"),
]
),
indexes=["address_1", "address_2", ["address_1", "address_2"]],
),
]
)

Place to Feature View

Now everything is ready to define a feature view to handle all the tasks in the right order.

  • write the feature view: First, we should write a class that inherits from the FeatureView:

    class TokenRecommendation(FeatureView):
    online = False
    name = "Token Recommendation with Local csv as Source and save result to database"

    Note that we set online to false because we want this feature view for training purposes.

  • offline source: The source previously defined should be added to the feature view:

    class TokenRecommendation(FeatureView):
    online = False
    name = "Token Recommendation with Local csv as Source and save result to database"

    offline_source = FlipSideSource(
    api_key=API_KEY,
    filters={
    "BLOCK_NUMBER": {"val": 19710819, "op": ">="},
    },
    )
  • offline pipeline: The pipeline that we designed should be added to the feature

view:

class TokenRecommendation(FeatureView):
online = False
name = "Token Recommendation with Local csv as Source and save result to database"

offline_source = FlipSideSource(
api_key=API_KEY,
filters={
"BLOCK_NUMBER": {"val": 19710819, "op": ">="},
},
)
offline_pipeline = pipeline
  • saver: There is the last step, place the saver into the feature view. Note that the saver only works in offline mode.

Run the feature view

Now everything is ready to run the feature view. First, we should create a new instance of it:

view = TokenRecommendation()

After that, by calling it, the process will start:

view()